-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathpackets.py
More file actions
152 lines (135 loc) · 5.35 KB
/
Copy pathpackets.py
File metadata and controls
152 lines (135 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""Regular DNSCrypt query and response packet construction."""
from __future__ import annotations
import os
from typing import Sequence
from certificates import client_pk_len_for_es_version
from constants import (
CLIENT_MAGIC_SIZE,
CLIENT_NONCE_SIZE,
ES_VERSION_XCHACHA20POLY1305,
ES_VERSION_XWING,
MIN_UDP_QUERY_LEN,
NONCE_SIZE,
PUBLIC_KEY_SIZE,
RESOLVER_MAGIC,
RESOLVER_NONCE_SIZE,
TAG_SIZE,
)
from crypto import (
_require_size,
box_xchacha20_shared_key,
pad_7816_4,
query_nonce,
unpad_7816_4,
x25519_public_key,
xchacha20_djb_poly1305_open,
xchacha20_djb_poly1305_seal,
)
from errors import CertificateError, DecryptionError
from protocol_types import DecryptedQuery, PreparedQuery, ResolverCertificate
def encrypt_dnscrypt_query(
certificate,
client_sk: bytes,
client_query: bytes,
client_nonce: bytes | None = None,
min_query_len: int = MIN_UDP_QUERY_LEN,
) -> PreparedQuery:
"""Construct `<dnscrypt-query>` for es-version 0x0002."""
if certificate.es_version != ES_VERSION_XCHACHA20POLY1305:
raise CertificateError("encrypt_dnscrypt_query expects es-version 0x0002")
if client_nonce is None:
client_nonce = os.urandom(CLIENT_NONCE_SIZE)
_require_size("client_nonce", client_nonce, CLIENT_NONCE_SIZE)
client_pk = x25519_public_key(client_sk)
shared_key = box_xchacha20_shared_key(client_sk, certificate.resolver_pk)
nonce = query_nonce(client_nonce)
plaintext = pad_7816_4(client_query, min_query_len)
encrypted_query = xchacha20_djb_poly1305_seal(shared_key, nonce, plaintext)
return PreparedQuery(
dnscrypt_query=certificate.client_magic
+ client_pk
+ client_nonce
+ encrypted_query,
shared_key=shared_key,
client_pk=client_pk,
client_nonce=client_nonce,
)
def decrypt_dnscrypt_query(
dnscrypt_query: bytes,
resolver_certificates: Sequence[ResolverCertificate],
) -> DecryptedQuery:
"""Open `<dnscrypt-query>` at a resolver."""
if len(dnscrypt_query) < CLIENT_MAGIC_SIZE + PUBLIC_KEY_SIZE + CLIENT_NONCE_SIZE:
raise DecryptionError("query is too short")
client_magic = dnscrypt_query[:CLIENT_MAGIC_SIZE]
match = next(
(
rc
for rc in resolver_certificates
if rc.certificate.client_magic == client_magic
),
None,
)
if match is None:
raise DecryptionError("client-magic does not match any certificate")
certificate = match.certificate
client_pk_len = client_pk_len_for_es_version(certificate.es_version)
header_len = CLIENT_MAGIC_SIZE + client_pk_len + CLIENT_NONCE_SIZE
if len(dnscrypt_query) < header_len + TAG_SIZE:
raise DecryptionError("query is truncated")
client_pk = dnscrypt_query[CLIENT_MAGIC_SIZE : CLIENT_MAGIC_SIZE + client_pk_len]
client_nonce = dnscrypt_query[CLIENT_MAGIC_SIZE + client_pk_len : header_len]
encrypted_query = dnscrypt_query[header_len:]
if certificate.es_version == ES_VERSION_XCHACHA20POLY1305:
shared_key = box_xchacha20_shared_key(match.resolver_sk, client_pk)
elif certificate.es_version == ES_VERSION_XWING:
from pq import pq_shared_key, xwing_decapsulate
kem_ss = xwing_decapsulate(encrypted_kem=client_pk, secret_seed=match.resolver_sk)
shared_key = pq_shared_key(certificate, kem_ss, client_pk)
else:
raise CertificateError("unsupported certificate")
plaintext = xchacha20_djb_poly1305_open(
shared_key, query_nonce(client_nonce), encrypted_query
)
return DecryptedQuery(
client_query=unpad_7816_4(plaintext),
shared_key=shared_key,
client_pk=client_pk,
client_nonce=client_nonce,
certificate=certificate,
)
def encrypt_dnscrypt_response(
resolver_response: bytes,
shared_key: bytes,
client_nonce: bytes,
resolver_nonce: bytes | None = None,
minimum_length: int = 64,
) -> bytes:
"""Construct `<dnscrypt-response>` for regular DNSCrypt."""
if resolver_nonce is None:
resolver_nonce = os.urandom(RESOLVER_NONCE_SIZE)
_require_size("client_nonce", client_nonce, CLIENT_NONCE_SIZE)
_require_size("resolver_nonce", resolver_nonce, RESOLVER_NONCE_SIZE)
nonce = client_nonce + resolver_nonce
plaintext = pad_7816_4(resolver_response, minimum_length)
encrypted_response = xchacha20_djb_poly1305_seal(shared_key, nonce, plaintext)
return RESOLVER_MAGIC + nonce + encrypted_response
def decrypt_dnscrypt_response(
dnscrypt_response: bytes,
shared_key: bytes,
expected_client_nonce: bytes,
) -> bytes:
"""Open `<dnscrypt-response>` and return `<resolver-response>`."""
_require_size("expected_client_nonce", expected_client_nonce, CLIENT_NONCE_SIZE)
header_len = len(RESOLVER_MAGIC) + NONCE_SIZE
if len(dnscrypt_response) < header_len + TAG_SIZE:
raise DecryptionError("response is too short")
if dnscrypt_response[: len(RESOLVER_MAGIC)] != RESOLVER_MAGIC:
raise DecryptionError("invalid resolver magic")
nonce = dnscrypt_response[len(RESOLVER_MAGIC) : header_len]
if nonce[:CLIENT_NONCE_SIZE] != expected_client_nonce:
raise DecryptionError("response client-nonce does not match query")
plaintext = xchacha20_djb_poly1305_open(
shared_key, nonce, dnscrypt_response[header_len:]
)
return unpad_7816_4(plaintext)