//The Job
Category: Crypto
Author: L3G10N
Remote: nc [remote.infoseciitr.in](http://remote.infoseciitr.in) 4006
Flag: flag{h0w_d1d_h3_b3c0m3_th3_m4n4g3r}
>1. Challenge Snapshot
"All you have to do is create a hash polynomial that equally divides the input."
- Server expects us to supply polynomial coefficients modulo $10^9+7$.
- The polynomial is evaluated on 896 leaked inputs; the value mod $10^9+7$ is taken as the hash slot (0–255).
- The manager deems a polynomial valid when no slot exceeds $\lceil 896/256 \rceil = 4$ elements.
- Bonus twist: a hidden "junk" value already occupies one unknown slot. After six manager trials we must name that slot.
So the real problem is interactive set search under a strict per-slot cap.
>2. Reading the Server
Key snippets from server.py:
k = 256
mod = 10**9+7
n = 896
number_array = [rand.randint(0,mod-1) for i in range(n)]
...
hash_table = [[] for i in range(k)]
hash = get_hash(number_array[i], coeff_arr)
if hash >= k: # immediate death if polynomial outputs ≥256
...
hash_table[hash].append(number_array[i])
if len(hash_table[i]) > (n+k-1)/k: # i.e. >4
fail()
...
target = rand.randint(0, k-1)
junk = rand.randint(0, mod-1)
During the six free trials, the hidden target slot already contains junk. We submit six different polynomials, learning only whether each one kept all slots ≤4. After that, we must guess target exactly.
Observations
- All leaked inputs are distinct. That eliminates collisions during interpolation.
- Cap is 4. If we can force exactly four hashes into a set of slots and keep every other slot at ≤3, the manager response becomes a yes/no membership oracle for that slot set.
get_hashevaluates coefficients from highest degree downward. We must send coefficients in that exact order (highest term first) to match the server.
>3. Strategy
We need two capabilities:
- Construct any desired slot distribution while respecting the cap.
- Use six yes/no oracle queries to isolate the target slot.
3.1 Balanced Distributions by Design
Let $S$ be the candidate slot set we want to test (size ≤256). We build a histogram counts[bucket] such that:
- Buckets in
Sget exactly four assignments. - Buckets not in
Sbut still possible targets get ≤3 assignments (so they do not trigger failure if the junk is elsewhere). - Remaining buckets (already known dead) soak up the leftover inputs while staying ≤4.
Because we control the mapping from each leaked input x_i to an arbitrary slot bucket_i < 256, the counts vector fully determines whether the manager says "passed".
3.2 Interpolating the Required Hash Polynomial
We have 896 points (x_i, bucket_i) over the prime field $\mathbb{F}_{10^9+7}`. Interpolating a degree-895 polynomial through all of them is straightforward via standard incremental Lagrange basis construction.
Constraints:
- Degree < 896 (as required by the prompt).
- All hash values already reside in
[0, 255], so the manager never complains about non-existent slots.
3.3 Oracle-Style Search
Having a membership oracle, we simply run a breadth-first split:
- Start with all 256 slots.
- Query half of them; if the manager says "failed", target is inside; otherwise it is outside.
- Repeat with the surviving subset.
Six rounds shrink $256 / 2^6 = 4$ slots. At that point we guess uniformly (25% success). Our automation keeps restarting until the guess hits—usually within a few attempts.
>4. Implementation
4.1 Helper Library — solver.py
import sys
from typing import List, Sequence, Iterable
MOD = 10**9 + 7
def modinv(x: int) -> int:
return pow(x % MOD, MOD - 2, MOD)
def poly_eval(coeffs: List[int], x: int) -> int:
res = 0
for coeff in reversed(coeffs):
res = (res * x + coeff) % MOD
return res
def poly_add(p: List[int], q: List[int]) -> List[int]:
if len(p) < len(q):
p, q = q, p
res = p[:] # copy longer
for idx in range(len(q)):
res[idx] = (res[idx] + q[idx]) % MOD
return res
def poly_scale(p: List[int], factor: int) -> List[int]:
return [(coeff * factor) % MOD for coeff in p]
def poly_mul_x_minus_a(p: List[int], a: int) -> List[int]:
res = [0] * (len(p) + 1)
for idx, coeff in enumerate(p):
res[idx] = (res[idx] - coeff * a) % MOD
res[idx + 1] = (res[idx + 1] + coeff) % MOD
return res
def interpolate(xs: List[int], ys: List[int]) -> List[int]:
if len(xs) != len(ys):
raise ValueError("Point lists must be the same length")
poly = [0]
basis = [1]
for idx, (x_val, y_val) in enumerate(zip(xs, ys)):
y_val %= MOD
val = poly_eval(poly, x_val)
denom = 1
for prev in range(idx):
denom = (denom * (x_val - xs[prev])) % MOD
delta = (y_val - val) % MOD
denom_inv = modinv(denom)
delta = (delta * denom_inv) % MOD
poly = poly_add(poly, poly_scale(basis, delta))
basis = poly_mul_x_minus_a(basis, x_val)
return [c % MOD for c in poly]
def assign_counts(k: int, n: int, candidate_set: Sequence[int], test_subset: Sequence[int]) -> List[int]:
candidate = set(candidate_set)
subset = set(test_subset)
if not subset.issubset(candidate):
raise ValueError("Subset must be within candidate set")
counts = [0] * k
for bucket in subset:
counts[bucket] = 4
remaining = n - 4 * len(subset)
dead = [idx for idx in range(k) if idx not in candidate]
# Fill dead buckets up to 4 first so they never influence manager feedback.
for bucket in dead:
if remaining == 0:
break
take = min(4, remaining)
counts[bucket] = take
remaining -= take
# Alive buckets not under test can take at most 3 entries.
for bucket in candidate:
if bucket in subset:
continue
if remaining == 0:
break
take = min(3, remaining)
counts[bucket] = take
remaining -= take
# If numbers are still left, spread them over dead buckets that have spare capacity.
if remaining:
for bucket in dead:
if remaining == 0:
break
spare = 4 - counts[bucket]
if spare <= 0:
continue
take = min(spare, remaining)
counts[bucket] += take
remaining -= take
if remaining:
raise ValueError("Unable to place all numbers with current subset choice")
if sum(counts) != n:
raise ValueError("Counts do not sum to total numbers")
return counts
def counts_to_targets(counts: Sequence[int], n: int) -> List[int]:
targets: List[int] = []
for bucket, count in enumerate(counts):
targets.extend([bucket] * count)
if len(targets) != n:
raise ValueError("Target list length mismatch")
return targets
def poly_from_counts(xs: Sequence[int], counts: Sequence[int]) -> List[int]:
targets = counts_to_targets(counts, len(xs))
return interpolate(list(xs), targets)
4.2 Exploit Automation — exploit.py
import argparse
import random
from typing import Callable, List, Sequence
from pwn import context, process, remote
from solver import assign_counts, counts_to_targets, poly_from_counts
MOD = 10**9 + 7
K = 256
N = 896
def parse_numbers(line: str) -> List[int]:
return [int(part) for part in line.strip().split(',') if part]
def coeffs_to_string(coeffs: Sequence[int]) -> str:
ordered = list(reversed(coeffs))
return ','.join(str(coeff % MOD) for coeff in ordered)
def eval_server_hash(coeff_arr: Sequence[int], num: int) -> int:
res = 0
mult = 1
L = len(coeff_arr)
for idx in range(L):
res = (res + mult * coeff_arr[L - 1 - idx]) % MOD
mult = (mult * num) % MOD
return res
def validate_against_server(coeff_str: str, numbers: Sequence[int], targets: Sequence[int]) -> None:
coeff_arr = [int(token) for token in coeff_str.split(',') if token]
for idx, (num, target) in enumerate(zip(numbers, targets)):
if eval_server_hash(coeff_arr, num) != target:
raise RuntimeError(f'Eval mismatch at index {idx}')
def choose_subset(candidates: Sequence[int]) -> List[int]:
cand_list = sorted(candidates)
size = len(cand_list)
if size <= 1:
return cand_list
min_size = max(size - 128, 1)
target_size = max(size // 2, min_size)
return cand_list[:target_size]
def solve_session(io) -> str:
io.recvuntil(b'Press Enter to start > ')
io.sendline(b'')
io.recvuntil(b'Here are the leaked numbers : ')
numbers_line = io.recvline().strip().decode()
numbers = parse_numbers(numbers_line)
if len(numbers) != N:
raise RuntimeError('Unexpected number count: %d' % len(numbers))
candidate_set = set(range(K))
first_subset = set(choose_subset(candidate_set))
counts = assign_counts(K, N, candidate_set, first_subset)
coeffs = poly_from_counts(numbers, counts)
targets = counts_to_targets(counts, N)
coeff_str = coeffs_to_string(coeffs)
validate_against_server(coeff_str, numbers, targets)
io.recvuntil(b'> ')
io.sendline(coeff_str.encode())
line = io.recvline().decode()
if 'failed' in line.lower():
raise RuntimeError('Stage 1 polynomial rejected')
io.recvuntil(b'Press Enter to continue > ')
io.sendline(b'')
for turn in range(6):
if turn == 0:
subset = set(first_subset)
coeff_line = coeff_str
else:
subset = set(choose_subset(candidate_set))
counts = assign_counts(K, N, candidate_set, subset)
coeffs = poly_from_counts(numbers, counts)
targets = counts_to_targets(counts, N)
coeff_line = coeffs_to_string(coeffs)
validate_against_server(coeff_line, numbers, targets)
io.recvuntil(f'Trial {turn + 1} :'.encode())
io.recvuntil(b'> ')
io.sendline(coeff_line.encode())
response = io.recvline().decode()
failed = 'failed' in response.lower()
if failed:
candidate_set &= subset
else:
candidate_set -= subset
if not candidate_set:
raise RuntimeError('Candidate set empty after trial %d' % (turn + 1))
remaining = sorted(candidate_set)
print('Remaining candidates:', remaining)
guess = random.choice(remaining)
io.recvuntil(b'Tell your friend the index : ')
io.sendline(str(guess).encode())
return io.recvall().decode()
def run(make_io: Callable[[], object], max_attempts: int) -> None:
context.log_level = 'error'
for attempt in range(1, max_attempts + 1):
print(f'Attempt {attempt}')
io = make_io()
outcome = solve_session(io)
print(outcome)
if 'lost the job' not in outcome.lower():
return
print('All attempts exhausted without success.')
def main() -> None:
parser = argparse.ArgumentParser(description='Solve The Job hash challenge')
parser.add_argument('--remote', nargs=2, metavar=('HOST', 'PORT'), help='Connect to remote service')
parser.add_argument('--attempts', type=int, default=20, help='Maximum attempts before giving up')
args = parser.parse_args()
if args.remote:
host, port = args.remote[0], int(args.remote[1])
def make_io_remote():
return remote(host, port)
run(make_io_remote, args.attempts)
else:
run(lambda: process(['python3', 'server.py']), args.attempts)
if __name__ == '__main__':
main()
>5. Local Validation
- Seed the environment with a placeholder
flag.pyand run the solver locally:
python3 exploit.py --attempts 5
Typical output snippet:
Attempt 1
Remaining candidates: [144, 145, 146, 147]
... lost the job
Attempt 2
Remaining candidates: [180, 181, 182, 183]
You truly are a cryptography genius!
Here is the flag : flag{test_flag}
As expected, each full run narrows the slots to four and guesses randomly until the target is hit.
>6. Remote Execution
After confirming the solver locally, we pointed it at the provided host:
python3 exploit.py --remote remote.infoseciitr.in 4006 --attempts 20
Successful session:
Attempt 5
Remaining candidates: [136, 137, 138, 139]
You truly are a cryptography genius!
Here is the flag : flag{h0w_d1d_h3_b3c0m3_th3_m4n4g3r}
The earlier attempts simply guessed the wrong member of the final 4-slot bucket, but within five tries the correct slot was hit and the flag printed.