1 # Copyright (C) 2003-2007, 2009 Nominum, Inc.
3 # Permission to use, copy, modify, and distribute this software and its
4 # documentation for any purpose with or without fee is hereby granted,
5 # provided that the above copyright notice and this permission notice
6 # appear in all copies.
8 # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 """Common DNSSEC-related functions and constants."""
31 class UnsupportedAlgorithm(dns
.exception
.DNSException
):
32 """Raised if an algorithm is not supported."""
35 class ValidationFailure(dns
.exception
.DNSException
):
36 """The DNSSEC signature is invalid."""
52 _algorithm_by_text
= {
58 'DSANSEC3SHA1' : DSANSEC3SHA1
,
59 'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1
,
60 'RSASHA256' : RSASHA256
,
61 'RSASHA512' : RSASHA512
,
62 'INDIRECT' : INDIRECT
,
63 'PRIVATEDNS' : PRIVATEDNS
,
64 'PRIVATEOID' : PRIVATEOID
,
67 # We construct the inverse mapping programmatically to ensure that we
68 # cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
69 # would cause the mapping not to be true inverse.
71 _algorithm_by_value
= dict([(y
, x
) for x
, y
in _algorithm_by_text
.iteritems()])
73 def algorithm_from_text(text
):
74 """Convert text into a DNSSEC algorithm value
77 value
= _algorithm_by_text
.get(text
.upper())
82 def algorithm_to_text(value
):
83 """Convert a DNSSEC algorithm value to text
86 text
= _algorithm_by_value
.get(value
)
91 def _to_rdata(record
, origin
):
92 s
= cStringIO
.StringIO()
93 record
.to_wire(s
, origin
=origin
)
96 def key_id(key
, origin
=None):
97 rdata
= _to_rdata(key
, origin
)
98 if key
.algorithm
== RSAMD5
:
99 return (ord(rdata
[-3]) << 8) + ord(rdata
[-2])
102 for i
in range(len(rdata
) / 2):
103 total
+= (ord(rdata
[2 * i
]) << 8) + ord(rdata
[2 * i
+ 1])
104 if len(rdata
) % 2 != 0:
105 total
+= ord(rdata
[len(rdata
) - 1]) << 8
106 total
+= ((total
>> 16) & 0xffff);
107 return total
& 0xffff
109 def make_ds(name
, key
, algorithm
, origin
=None):
110 if algorithm
.upper() == 'SHA1':
112 hash = dns
.hash.get('SHA1')()
113 elif algorithm
.upper() == 'SHA256':
115 hash = dns
.hash.get('SHA256')()
117 raise UnsupportedAlgorithm
, 'unsupported algorithm "%s"' % algorithm
119 if isinstance(name
, (str, unicode)):
120 name
= dns
.name
.from_text(name
, origin
)
121 hash.update(name
.canonicalize().to_wire())
122 hash.update(_to_rdata(key
, origin
))
123 digest
= hash.digest()
125 dsrdata
= struct
.pack("!HBB", key_id(key
), key
.algorithm
, dsalg
) + digest
126 return dns
.rdata
.from_wire(dns
.rdataclass
.IN
, dns
.rdatatype
.DS
, dsrdata
, 0,
129 def _find_key(keys
, rrsig
):
130 value
= keys
.get(rrsig
.signer
)
133 if isinstance(value
, dns
.node
.Node
):
135 rdataset
= node
.find_rdataset(dns
.rdataclass
.IN
,
136 dns
.rdatatype
.DNSKEY
)
141 for rdata
in rdataset
:
142 if rdata
.algorithm
== rrsig
.algorithm
and \
143 key_id(rdata
) == rrsig
.key_tag
:
147 def _is_rsa(algorithm
):
148 return algorithm
in (RSAMD5
, RSASHA1
,
149 RSASHA1NSEC3SHA1
, RSASHA256
,
152 def _is_dsa(algorithm
):
153 return algorithm
in (DSA
, DSANSEC3SHA1
)
155 def _is_md5(algorithm
):
156 return algorithm
== RSAMD5
158 def _is_sha1(algorithm
):
159 return algorithm
in (DSA
, RSASHA1
,
160 DSANSEC3SHA1
, RSASHA1NSEC3SHA1
)
162 def _is_sha256(algorithm
):
163 return algorithm
== RSASHA256
165 def _is_sha512(algorithm
):
166 return algorithm
== RSASHA512
168 def _make_hash(algorithm
):
169 if _is_md5(algorithm
):
170 return dns
.hash.get('MD5')()
171 if _is_sha1(algorithm
):
172 return dns
.hash.get('SHA1')()
173 if _is_sha256(algorithm
):
174 return dns
.hash.get('SHA256')()
175 if _is_sha512(algorithm
):
176 return dns
.hash.get('SHA512')()
177 raise ValidationFailure
, 'unknown hash for algorithm %u' % algorithm
179 def _make_algorithm_id(algorithm
):
180 if _is_md5(algorithm
):
181 oid
= [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
182 elif _is_sha1(algorithm
):
183 oid
= [0x2b, 0x0e, 0x03, 0x02, 0x1a]
184 elif _is_sha256(algorithm
):
185 oid
= [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
186 elif _is_sha512(algorithm
):
187 oid
= [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
189 raise ValidationFailure
, 'unknown algorithm %u' % algorithm
191 dlen
= _make_hash(algorithm
).digest_size
192 idbytes
= [0x30] + [8 + olen
+ dlen
] + \
193 [0x30, olen
+ 4] + [0x06, olen
] + oid
+ \
194 [0x05, 0x00] + [0x04, dlen
]
195 return ''.join(map(chr, idbytes
))
197 def _validate_rrsig(rrset
, rrsig
, keys
, origin
=None, now
=None):
198 """Validate an RRset against a single signature rdata
200 The owner name of the rrsig is assumed to be the same as the owner name
203 @param rrset: The RRset to validate
204 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
206 @param rrsig: The signature rdata
207 @type rrsig: dns.rrset.Rdata
208 @param keys: The key dictionary.
209 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
210 @param origin: The origin to use for relative names
211 @type origin: dns.name.Name or None
212 @param now: The time to use when validating the signatures. The default
217 if isinstance(origin
, (str, unicode)):
218 origin
= dns
.name
.from_text(origin
, dns
.name
.root
)
220 key
= _find_key(keys
, rrsig
)
222 raise ValidationFailure
, 'unknown key'
224 # For convenience, allow the rrset to be specified as a (name, rdataset)
225 # tuple as well as a proper rrset
226 if isinstance(rrset
, tuple):
235 if rrsig
.expiration
< now
:
236 raise ValidationFailure
, 'expired'
237 if rrsig
.inception
> now
:
238 raise ValidationFailure
, 'not yet valid'
240 hash = _make_hash(rrsig
.algorithm
)
242 if _is_rsa(rrsig
.algorithm
):
244 (bytes
,) = struct
.unpack('!B', keyptr
[0:1])
247 (bytes
,) = struct
.unpack('!H', keyptr
[0:2])
249 rsa_e
= keyptr
[0:bytes
]
250 rsa_n
= keyptr
[bytes
:]
251 keylen
= len(rsa_n
) * 8
252 pubkey
= Crypto
.PublicKey
.RSA
.construct(
253 (Crypto
.Util
.number
.bytes_to_long(rsa_n
),
254 Crypto
.Util
.number
.bytes_to_long(rsa_e
)))
255 sig
= (Crypto
.Util
.number
.bytes_to_long(rrsig
.signature
),)
256 elif _is_dsa(rrsig
.algorithm
):
258 (t
,) = struct
.unpack('!B', keyptr
[0:1])
263 dsa_p
= keyptr
[0:octets
]
264 keyptr
= keyptr
[octets
:]
265 dsa_g
= keyptr
[0:octets
]
266 keyptr
= keyptr
[octets
:]
267 dsa_y
= keyptr
[0:octets
]
268 pubkey
= Crypto
.PublicKey
.DSA
.construct(
269 (Crypto
.Util
.number
.bytes_to_long(dsa_y
),
270 Crypto
.Util
.number
.bytes_to_long(dsa_g
),
271 Crypto
.Util
.number
.bytes_to_long(dsa_p
),
272 Crypto
.Util
.number
.bytes_to_long(dsa_q
)))
273 (dsa_r
, dsa_s
) = struct
.unpack('!20s20s', rrsig
.signature
[1:])
274 sig
= (Crypto
.Util
.number
.bytes_to_long(dsa_r
),
275 Crypto
.Util
.number
.bytes_to_long(dsa_s
))
277 raise ValidationFailure
, 'unknown algorithm %u' % rrsig
.algorithm
279 hash.update(_to_rdata(rrsig
, origin
)[:18])
280 hash.update(rrsig
.signer
.to_digestable(origin
))
282 if rrsig
.labels
< len(rrname
) - 1:
283 suffix
= rrname
.split(rrsig
.labels
+ 1)[1]
284 rrname
= dns
.name
.from_text('*', suffix
)
285 rrnamebuf
= rrname
.to_digestable(origin
)
286 rrfixed
= struct
.pack('!HHI', rdataset
.rdtype
, rdataset
.rdclass
,
288 rrlist
= sorted(rdataset
);
290 hash.update(rrnamebuf
)
292 rrdata
= rr
.to_digestable(origin
)
293 rrlen
= struct
.pack('!H', len(rrdata
))
297 digest
= hash.digest()
299 if _is_rsa(rrsig
.algorithm
):
300 # PKCS1 algorithm identifier goop
301 digest
= _make_algorithm_id(rrsig
.algorithm
) + digest
302 padlen
= keylen
/ 8 - len(digest
) - 3
303 digest
= chr(0) + chr(1) + chr(0xFF) * padlen
+ chr(0) + digest
304 elif _is_dsa(rrsig
.algorithm
):
307 # Raise here for code clarity; this won't actually ever happen
308 # since if the algorithm is really unknown we'd already have
309 # raised an exception above
310 raise ValidationFailure
, 'unknown algorithm %u' % rrsig
.algorithm
312 if not pubkey
.verify(digest
, sig
):
313 raise ValidationFailure
, 'verify failure'
315 def _validate(rrset
, rrsigset
, keys
, origin
=None, now
=None):
318 @param rrset: The RRset to validate
319 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
321 @param rrsigset: The signature RRset
322 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
324 @param keys: The key dictionary.
325 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
326 @param origin: The origin to use for relative names
327 @type origin: dns.name.Name or None
328 @param now: The time to use when validating the signatures. The default
333 if isinstance(origin
, (str, unicode)):
334 origin
= dns
.name
.from_text(origin
, dns
.name
.root
)
336 if isinstance(rrset
, tuple):
341 if isinstance(rrsigset
, tuple):
342 rrsigname
= rrsigset
[0]
343 rrsigrdataset
= rrsigset
[1]
345 rrsigname
= rrsigset
.name
346 rrsigrdataset
= rrsigset
348 rrname
= rrname
.choose_relativity(origin
)
349 rrsigname
= rrname
.choose_relativity(origin
)
350 if rrname
!= rrsigname
:
351 raise ValidationFailure
, "owner names do not match"
353 for rrsig
in rrsigrdataset
:
355 _validate_rrsig(rrset
, rrsig
, keys
, origin
, now
)
357 except ValidationFailure
, e
:
359 raise ValidationFailure
, "no RRSIGs validated"
361 def _need_pycrypto(*args
, **kwargs
):
362 raise NotImplementedError, "DNSSEC validation requires pycrypto"
365 import Crypto
.PublicKey
.RSA
366 import Crypto
.PublicKey
.DSA
367 import Crypto
.Util
.number
369 validate_rrsig
= _validate_rrsig
371 validate
= _need_pycrypto
372 validate_rrsig
= _need_pycrypto