3 # This Source Code Form is subject to the terms of the Mozilla Public
4 # License, v. 2.0. If a copy of the MPL was not distributed with this
5 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 Reads a certificate specification from stdin or a file and outputs a
9 signed x509 certificate with the desired properties.
11 The input format is as follows:
13 issuer:<issuer distinguished name specification>
14 subject:<subject distinguished name specification>
16 [validity:<YYYYMMDD-YYYYMMDD|duration in days>]
17 [issuerKey:<key specification>]
18 [subjectKey:<key specification>]
19 [signature:{sha256WithRSAEncryption,sha1WithRSAEncryption,
20 md5WithRSAEncryption,ecdsaWithSHA256,ecdsaWithSHA384,
22 [serialNumber:<integer in the interval [1, 127]>]
23 [extension:<extension name:<extension-specific data>>]
27 basicConstraints:[cA],[pathLenConstraint]
28 keyUsage:[digitalSignature,nonRepudiation,keyEncipherment,
29 dataEncipherment,keyAgreement,keyCertSign,cRLSign]
30 extKeyUsage:[serverAuth,clientAuth,codeSigning,emailProtection
31 nsSGC, # Netscape Server Gated Crypto
32 OCSPSigning,timeStamping]
33 subjectAlternativeName:[<dNSName|directoryName|"ip4:"iPV4Address>,...]
34 authorityInformationAccess:<OCSP URI>
35 certificatePolicies:[<policy OID>,...]
36 nameConstraints:{permitted,excluded}:[<dNSName|directoryName>,...]
38 TLSFeature:[<TLSFeature>,...]
39 embeddedSCTList:[<key specification>:<YYYYMMDD>,...]
43 [] indicates an optional field or component of a field
44 <> indicates a required component of a field
45 {} indicates a choice of exactly one value among a set of values
46 [a,b,c] indicates a list of potential values, of which zero or more
49 For instance, the version field is optional. However, if it is
50 specified, it must have exactly one value from the set {1,2,3,4}.
52 Most fields have reasonable default values. By default one shared RSA
53 key is used for all signatures and subject public key information
54 fields. Using "issuerKey:<key specification>" or
55 "subjectKey:<key specification>" causes a different key be used for
56 signing or as the subject public key information field, respectively.
57 See pykey.py for the list of available specifications.
58 The signature algorithm is sha256WithRSAEncryption by default.
60 The validity period may be specified as either concrete notBefore and
61 notAfter values or as a validity period centered around 'now'. For the
62 latter, this will result in a notBefore of 'now' - duration/2 and a
63 notAfter of 'now' + duration/2.
65 Issuer and subject distinguished name specifications are of the form
66 '[stringEncoding]/C=XX/O=Example/CN=example.com'. C (country name), ST
67 (state or province name), L (locality name), O (organization name), OU
68 (organizational unit name), CN (common name) and emailAddress (email
69 address) are currently supported. The optional stringEncoding field may
70 be 'utf8String' or 'printableString'. If the given string does not
71 contain a '/', it is assumed to represent a common name. If an empty
72 string is provided, then an empty distinguished name is returned.
73 DirectoryNames also use this format. When specifying a directoryName in
74 a nameConstraints extension, the implicit form may not be used.
76 If an extension name has '[critical]' after it, it will be marked as
77 critical. Otherwise (by default), it will not be marked as critical.
79 TLSFeature values can either consist of a named value (currently only
80 'OCSPMustStaple' which corresponds to status_request) or a numeric TLS
81 feature value (see rfc7633 for more information).
83 If a serial number is not explicitly specified, it is automatically
84 generated based on the contents of the certificate.
93 from struct
import pack
98 from pyasn1
.codec
.der
import decoder
, encoder
99 from pyasn1
.type import constraint
, tag
, univ
, useful
100 from pyasn1_modules
import rfc2459
103 class Error(Exception):
104 """Base class for exceptions in this module."""
109 class UnknownBaseError(Error
):
110 """Base class for handling unexpected input in this module."""
112 def __init__(self
, value
):
113 super(UnknownBaseError
, self
).__init
__()
115 self
.category
= "input"
118 return 'Unknown %s type "%s"' % (self
.category
, repr(self
.value
))
121 class UnknownAlgorithmTypeError(UnknownBaseError
):
122 """Helper exception type to handle unknown algorithm types."""
124 def __init__(self
, value
):
125 UnknownBaseError
.__init
__(self
, value
)
126 self
.category
= "algorithm"
129 class UnknownParameterTypeError(UnknownBaseError
):
130 """Helper exception type to handle unknown input parameters."""
132 def __init__(self
, value
):
133 UnknownBaseError
.__init
__(self
, value
)
134 self
.category
= "parameter"
137 class UnknownExtensionTypeError(UnknownBaseError
):
138 """Helper exception type to handle unknown input extensions."""
140 def __init__(self
, value
):
141 UnknownBaseError
.__init
__(self
, value
)
142 self
.category
= "extension"
145 class UnknownKeyPurposeTypeError(UnknownBaseError
):
146 """Helper exception type to handle unknown key purposes."""
148 def __init__(self
, value
):
149 UnknownBaseError
.__init
__(self
, value
)
150 self
.category
= "keyPurpose"
153 class UnknownKeyTargetError(UnknownBaseError
):
154 """Helper exception type to handle unknown key targets."""
156 def __init__(self
, value
):
157 UnknownBaseError
.__init
__(self
, value
)
158 self
.category
= "key target"
161 class UnknownVersionError(UnknownBaseError
):
162 """Helper exception type to handle unknown specified versions."""
164 def __init__(self
, value
):
165 UnknownBaseError
.__init
__(self
, value
)
166 self
.category
= "version"
169 class UnknownNameConstraintsSpecificationError(UnknownBaseError
):
170 """Helper exception type to handle unknown specified
173 def __init__(self
, value
):
174 UnknownBaseError
.__init
__(self
, value
)
175 self
.category
= "nameConstraints specification"
178 class UnknownDNTypeError(UnknownBaseError
):
179 """Helper exception type to handle unknown DN types."""
181 def __init__(self
, value
):
182 UnknownBaseError
.__init
__(self
, value
)
186 class UnknownNSCertTypeError(UnknownBaseError
):
187 """Helper exception type to handle unknown nsCertType types."""
189 def __init__(self
, value
):
190 UnknownBaseError
.__init
__(self
, value
)
191 self
.category
= "nsCertType"
194 class UnknownTLSFeature(UnknownBaseError
):
195 """Helper exception type to handle unknown TLS Features."""
197 def __init__(self
, value
):
198 UnknownBaseError
.__init
__(self
, value
)
199 self
.category
= "TLSFeature"
202 class UnknownDelegatedCredentialError(UnknownBaseError
):
203 """Helper exception type to handle unknown Delegated Credential args."""
205 def __init__(self
, value
):
206 UnknownBaseError
.__init
__(self
, value
)
207 self
.category
= "delegatedCredential"
210 class InvalidSCTSpecification(Error
):
211 """Helper exception type to handle invalid SCT specifications."""
213 def __init__(self
, value
):
214 super(InvalidSCTSpecification
, self
).__init
__()
218 return repr('invalid SCT specification "{}"' % self
.value
)
221 class InvalidSerialNumber(Error
):
222 """Exception type to handle invalid serial numbers."""
224 def __init__(self
, value
):
225 super(InvalidSerialNumber
, self
).__init
__()
229 return repr(self
.value
)
232 def getASN1Tag(asn1Type
):
233 """Helper function for returning the base tag value of a given
234 type from the pyasn1 package"""
235 return asn1Type
.tagSet
.baseTag
.tagId
238 def stringToAccessDescription(string
):
239 """Helper function that takes a string representing a URI
240 presumably identifying an OCSP authority information access
241 location. Returns an AccessDescription usable by pyasn1."""
242 accessMethod
= rfc2459
.id_ad_ocsp
243 accessLocation
= rfc2459
.GeneralName()
244 accessLocation
["uniformResourceIdentifier"] = string
245 sequence
= univ
.Sequence()
246 sequence
.setComponentByPosition(0, accessMethod
)
247 sequence
.setComponentByPosition(1, accessLocation
)
251 def stringToDN(string
, tag
=None):
252 """Takes a string representing a distinguished name or directory
253 name and returns a Name for use by pyasn1. See the documentation
254 for the issuer and subject fields for more details. Takes an
255 optional implicit tag in cases where the Name needs to be tagged
257 if string
and "/" not in string
:
258 string
= "/CN=%s" % string
259 rdns
= rfc2459
.RDNSequence()
260 pattern
= "/(C|ST|L|O|OU|CN|emailAddress)="
261 split
= re
.split(pattern
, string
)
262 # split should now be [[encoding], <type>, <value>, <type>, <value>, ...]
266 encoding
= "utf8String"
267 for pos
, (nameType
, value
) in enumerate(zip(split
[1::2], split
[2::2])):
268 ava
= rfc2459
.AttributeTypeAndValue()
270 ava
["type"] = rfc2459
.id_at_countryName
271 nameComponent
= rfc2459
.X520countryName(value
)
272 elif nameType
== "ST":
273 ava
["type"] = rfc2459
.id_at_stateOrProvinceName
274 nameComponent
= rfc2459
.X520StateOrProvinceName()
275 elif nameType
== "L":
276 ava
["type"] = rfc2459
.id_at_localityName
277 nameComponent
= rfc2459
.X520LocalityName()
278 elif nameType
== "O":
279 ava
["type"] = rfc2459
.id_at_organizationName
280 nameComponent
= rfc2459
.X520OrganizationName()
281 elif nameType
== "OU":
282 ava
["type"] = rfc2459
.id_at_organizationalUnitName
283 nameComponent
= rfc2459
.X520OrganizationalUnitName()
284 elif nameType
== "CN":
285 ava
["type"] = rfc2459
.id_at_commonName
286 nameComponent
= rfc2459
.X520CommonName()
287 elif nameType
== "emailAddress":
288 ava
["type"] = rfc2459
.emailAddress
289 nameComponent
= rfc2459
.Pkcs9email(value
)
291 raise UnknownDNTypeError(nameType
)
292 if not nameType
== "C" and not nameType
== "emailAddress":
293 # The value may have things like '\0' (i.e. a slash followed by
294 # the number zero) that have to be decoded into the resulting
295 # '\x00' (i.e. a byte with value zero).
296 nameComponent
[encoding
] = six
.ensure_binary(value
).decode(
297 encoding
="unicode_escape"
299 ava
["value"] = nameComponent
300 rdn
= rfc2459
.RelativeDistinguishedName()
301 rdn
.setComponentByPosition(0, ava
)
302 rdns
.setComponentByPosition(pos
, rdn
)
304 name
= rfc2459
.Name().subtype(implicitTag
=tag
)
306 name
= rfc2459
.Name()
307 name
.setComponentByPosition(0, rdns
)
311 def stringToAlgorithmIdentifiers(string
):
312 """Helper function that converts a description of an algorithm
313 to a representation usable by the pyasn1 package and a hash
314 algorithm constant for use by pykey."""
315 algorithmIdentifier
= rfc2459
.AlgorithmIdentifier()
318 # We add Null parameters for RSA only
319 addParameters
= False
320 if string
== "sha1WithRSAEncryption":
321 algorithmType
= pykey
.HASH_SHA1
322 algorithm
= rfc2459
.sha1WithRSAEncryption
324 elif string
== "sha256WithRSAEncryption":
325 algorithmType
= pykey
.HASH_SHA256
326 algorithm
= univ
.ObjectIdentifier("1.2.840.113549.1.1.11")
328 elif string
== "md5WithRSAEncryption":
329 algorithmType
= pykey
.HASH_MD5
330 algorithm
= rfc2459
.md5WithRSAEncryption
332 elif string
== "ecdsaWithSHA256":
333 algorithmType
= pykey
.HASH_SHA256
334 algorithm
= univ
.ObjectIdentifier("1.2.840.10045.4.3.2")
335 elif string
== "ecdsaWithSHA384":
336 algorithmType
= pykey
.HASH_SHA384
337 algorithm
= univ
.ObjectIdentifier("1.2.840.10045.4.3.3")
338 elif string
== "ecdsaWithSHA512":
339 algorithmType
= pykey
.HASH_SHA512
340 algorithm
= univ
.ObjectIdentifier("1.2.840.10045.4.3.4")
342 raise UnknownAlgorithmTypeError(string
)
343 algorithmIdentifier
["algorithm"] = algorithm
345 # Directly setting parameters to univ.Null doesn't currently work.
346 nullEncapsulated
= encoder
.encode(univ
.Null())
347 algorithmIdentifier
["parameters"] = univ
.Any(nullEncapsulated
)
348 return (algorithmIdentifier
, algorithmType
)
351 def datetimeToTime(dt
):
352 """Takes a datetime object and returns an rfc2459.Time object with
353 that time as its value as a GeneralizedTime"""
354 time
= rfc2459
.Time()
355 time
["generalTime"] = useful
.GeneralizedTime(dt
.strftime("%Y%m%d%H%M%SZ"))
359 def serialBytesToString(serialBytes
):
360 """Takes a list of integers in the interval [0, 255] and returns
361 the corresponding serial number string."""
362 serialBytesLen
= len(serialBytes
)
363 if serialBytesLen
> 127:
364 raise InvalidSerialNumber("{} bytes is too long".format(serialBytesLen
))
365 # Prepend the ASN.1 INTEGER tag and length bytes.
366 stringBytes
= [getASN1Tag(univ
.Integer
), serialBytesLen
] + serialBytes
367 return bytes(stringBytes
)
370 class Certificate(object):
371 """Utility class for reading a certificate specification and
372 generating a signed x509 certificate"""
374 def __init__(self
, paramStream
):
375 self
.versionValue
= 2 # a value of 2 is X509v3
376 self
.signature
= "sha256WithRSAEncryption"
377 self
.issuer
= "Default Issuer"
378 actualNow
= datetime
.datetime
.utcnow()
379 self
.now
= datetime
.datetime
.strptime(str(actualNow
.year
), "%Y")
380 aYearAndAWhile
= datetime
.timedelta(days
=400)
381 self
.notBefore
= self
.now
- aYearAndAWhile
382 self
.notAfter
= self
.now
+ aYearAndAWhile
383 self
.subject
= "Default Subject"
384 self
.extensions
= None
385 # The serial number can be automatically generated from the
386 # certificate specification. We need this value to depend in
387 # part of what extensions are present. self.extensions are
388 # pyasn1 objects. Depending on the string representation of
389 # these objects can cause the resulting serial number to change
390 # unexpectedly, so instead we depend on the original string
391 # representation of the extensions as specified.
392 self
.extensionLines
= None
393 self
.savedEmbeddedSCTListData
= None
394 self
.subjectKey
= pykey
.keyFromSpecification("default")
395 self
.issuerKey
= pykey
.keyFromSpecification("default")
396 self
.serialNumber
= None
397 self
.decodeParams(paramStream
)
398 # If a serial number wasn't specified, generate one based on
399 # the certificate contents.
400 if not self
.serialNumber
:
401 self
.serialNumber
= self
.generateSerialNumber()
402 # This has to be last because the SCT signature depends on the
403 # contents of the certificate.
404 if self
.savedEmbeddedSCTListData
:
405 self
.addEmbeddedSCTListData()
407 def generateSerialNumber(self
):
408 """Generates a serial number for this certificate based on its
409 contents. Intended to be reproducible for compatibility with
410 the build system on OS X (see the comment above main, later in
412 hasher
= hashlib
.sha256()
413 hasher
.update(six
.ensure_binary(str(self
.versionValue
)))
414 hasher
.update(six
.ensure_binary(self
.signature
))
415 hasher
.update(six
.ensure_binary(self
.issuer
))
416 hasher
.update(six
.ensure_binary(str(self
.notBefore
)))
417 hasher
.update(six
.ensure_binary(str(self
.notAfter
)))
418 hasher
.update(six
.ensure_binary(self
.subject
))
419 if self
.extensionLines
:
420 for extensionLine
in self
.extensionLines
:
421 hasher
.update(six
.ensure_binary(extensionLine
))
422 if self
.savedEmbeddedSCTListData
:
423 # savedEmbeddedSCTListData is
424 # (embeddedSCTListSpecification, critical), where |critical|
426 hasher
.update(six
.ensure_binary(self
.savedEmbeddedSCTListData
[0]))
427 if self
.savedEmbeddedSCTListData
[1]:
428 hasher
.update(six
.ensure_binary(self
.savedEmbeddedSCTListData
[1]))
429 serialBytes
= [c
for c
in hasher
.digest()[:20]]
430 # Ensure that the most significant bit isn't set (which would
431 # indicate a negative number, which isn't valid for serial
433 serialBytes
[0] &= 0x7F
434 # Also ensure that the least significant bit on the most
435 # significant byte is set (to prevent a leading zero byte,
436 # which also wouldn't be valid).
437 serialBytes
[0] |
= 0x01
438 return serialBytesToString(serialBytes
)
440 def decodeParams(self
, paramStream
):
441 for line
in paramStream
.readlines():
442 self
.decodeParam(line
.strip())
444 def decodeParam(self
, line
):
445 param
= line
.split(":")[0]
446 value
= ":".join(line
.split(":")[1:])
447 if param
== "version":
448 self
.setVersion(value
)
449 elif param
== "subject":
451 elif param
== "issuer":
453 elif param
== "validity":
454 self
.decodeValidity(value
)
455 elif param
== "extension":
456 self
.decodeExtension(value
)
457 elif param
== "issuerKey":
458 self
.setupKey("issuer", value
)
459 elif param
== "subjectKey":
460 self
.setupKey("subject", value
)
461 elif param
== "signature":
462 self
.signature
= value
463 elif param
== "serialNumber":
464 serialNumber
= int(value
)
465 # Ensure only serial numbers that conform to the rules listed in
466 # generateSerialNumber() are permitted.
467 if serialNumber
< 1 or serialNumber
> 127:
468 raise InvalidSerialNumber(value
)
469 self
.serialNumber
= serialBytesToString([serialNumber
])
471 raise UnknownParameterTypeError(param
)
473 def setVersion(self
, version
):
474 intVersion
= int(version
)
475 if intVersion
>= 1 and intVersion
<= 4:
476 self
.versionValue
= intVersion
- 1
478 raise UnknownVersionError(version
)
480 def decodeValidity(self
, duration
):
481 match
= re
.search("([0-9]{8})-([0-9]{8})", duration
)
483 self
.notBefore
= datetime
.datetime
.strptime(match
.group(1), "%Y%m%d")
484 self
.notAfter
= datetime
.datetime
.strptime(match
.group(2), "%Y%m%d")
486 delta
= datetime
.timedelta(days
=(int(duration
) / 2))
487 self
.notBefore
= self
.now
- delta
488 self
.notAfter
= self
.now
+ delta
490 def decodeExtension(self
, extension
):
491 match
= re
.search(r
"([a-zA-Z]+)(\[critical\])?:(.*)", extension
)
493 raise UnknownExtensionTypeError(extension
)
494 extensionType
= match
.group(1)
495 critical
= match
.group(2)
496 value
= match
.group(3)
497 if extensionType
== "basicConstraints":
498 self
.addBasicConstraints(value
, critical
)
499 elif extensionType
== "keyUsage":
500 self
.addKeyUsage(value
, critical
)
501 elif extensionType
== "extKeyUsage":
502 self
.addExtKeyUsage(value
, critical
)
503 elif extensionType
== "subjectAlternativeName":
504 self
.addSubjectAlternativeName(value
, critical
)
505 elif extensionType
== "authorityInformationAccess":
506 self
.addAuthorityInformationAccess(value
, critical
)
507 elif extensionType
== "certificatePolicies":
508 self
.addCertificatePolicies(value
, critical
)
509 elif extensionType
== "nameConstraints":
510 self
.addNameConstraints(value
, critical
)
511 elif extensionType
== "nsCertType":
512 self
.addNSCertType(value
, critical
)
513 elif extensionType
== "TLSFeature":
514 self
.addTLSFeature(value
, critical
)
515 elif extensionType
== "embeddedSCTList":
516 self
.savedEmbeddedSCTListData
= (value
, critical
)
517 elif extensionType
== "delegationUsage":
518 self
.addDelegationUsage(critical
)
520 raise UnknownExtensionTypeError(extensionType
)
522 if extensionType
!= "embeddedSCTList":
523 if not self
.extensionLines
:
524 self
.extensionLines
= []
525 self
.extensionLines
.append(extension
)
527 def setupKey(self
, subjectOrIssuer
, value
):
528 if subjectOrIssuer
== "subject":
529 self
.subjectKey
= pykey
.keyFromSpecification(value
)
530 elif subjectOrIssuer
== "issuer":
531 self
.issuerKey
= pykey
.keyFromSpecification(value
)
533 raise UnknownKeyTargetError(subjectOrIssuer
)
535 def addExtension(self
, extensionType
, extensionValue
, critical
):
536 if not self
.extensions
:
538 encapsulated
= univ
.OctetString(encoder
.encode(extensionValue
))
539 extension
= rfc2459
.Extension()
540 extension
["extnID"] = extensionType
541 # critical is either the string '[critical]' or None.
542 # We only care whether or not it is truthy.
544 extension
["critical"] = True
545 extension
["extnValue"] = encapsulated
546 self
.extensions
.append(extension
)
548 def addBasicConstraints(self
, basicConstraints
, critical
):
549 cA
= basicConstraints
.split(",")[0]
550 pathLenConstraint
= basicConstraints
.split(",")[1]
551 basicConstraintsExtension
= rfc2459
.BasicConstraints()
552 basicConstraintsExtension
["cA"] = cA
== "cA"
553 if pathLenConstraint
:
554 pathLenConstraintValue
= univ
.Integer(int(pathLenConstraint
)).subtype(
555 subtypeSpec
=constraint
.ValueRangeConstraint(0, float("inf"))
557 basicConstraintsExtension
["pathLenConstraint"] = pathLenConstraintValue
559 rfc2459
.id_ce_basicConstraints
, basicConstraintsExtension
, critical
562 def addKeyUsage(self
, keyUsage
, critical
):
563 keyUsageExtension
= rfc2459
.KeyUsage(keyUsage
)
564 self
.addExtension(rfc2459
.id_ce_keyUsage
, keyUsageExtension
, critical
)
566 def keyPurposeToOID(self
, keyPurpose
):
567 if keyPurpose
== "serverAuth":
568 return rfc2459
.id_kp_serverAuth
569 if keyPurpose
== "clientAuth":
570 return rfc2459
.id_kp_clientAuth
571 if keyPurpose
== "codeSigning":
572 return rfc2459
.id_kp_codeSigning
573 if keyPurpose
== "emailProtection":
574 return rfc2459
.id_kp_emailProtection
575 if keyPurpose
== "nsSGC":
576 return univ
.ObjectIdentifier("2.16.840.1.113730.4.1")
577 if keyPurpose
== "OCSPSigning":
578 return univ
.ObjectIdentifier("1.3.6.1.5.5.7.3.9")
579 if keyPurpose
== "timeStamping":
580 return rfc2459
.id_kp_timeStamping
581 raise UnknownKeyPurposeTypeError(keyPurpose
)
583 def addExtKeyUsage(self
, extKeyUsage
, critical
):
584 extKeyUsageExtension
= rfc2459
.ExtKeyUsageSyntax()
585 for count
, keyPurpose
in enumerate(extKeyUsage
.split(",")):
586 extKeyUsageExtension
.setComponentByPosition(
587 count
, self
.keyPurposeToOID(keyPurpose
)
589 self
.addExtension(rfc2459
.id_ce_extKeyUsage
, extKeyUsageExtension
, critical
)
591 def addSubjectAlternativeName(self
, names
, critical
):
594 subjectAlternativeName
= rfc2459
.SubjectAltName()
595 for count
, name
in enumerate(names
.split(",")):
596 generalName
= rfc2459
.GeneralName()
598 directoryName
= stringToDN(
599 name
, tag
.Tag(tag
.tagClassContext
, tag
.tagFormatSimple
, 4)
601 generalName
["directoryName"] = directoryName
603 generalName
["rfc822Name"] = name
604 elif name
.startswith(IPV4_PREFIX
):
605 generalName
["iPAddress"] = socket
.inet_pton(
606 socket
.AF_INET
, name
[len(IPV4_PREFIX
) :]
609 # The string may have things like '\0' (i.e. a slash
610 # followed by the number zero) that have to be decoded into
611 # the resulting '\x00' (i.e. a byte with value zero).
612 generalName
["dNSName"] = six
.ensure_binary(name
).decode(
615 subjectAlternativeName
.setComponentByPosition(count
, generalName
)
617 rfc2459
.id_ce_subjectAltName
, subjectAlternativeName
, critical
620 def addAuthorityInformationAccess(self
, ocspURI
, critical
):
621 sequence
= univ
.Sequence()
622 accessDescription
= stringToAccessDescription(ocspURI
)
623 sequence
.setComponentByPosition(0, accessDescription
)
624 self
.addExtension(rfc2459
.id_pe_authorityInfoAccess
, sequence
, critical
)
626 def addCertificatePolicies(self
, policyOIDs
, critical
):
627 policies
= rfc2459
.CertificatePolicies()
628 for pos
, policyOID
in enumerate(policyOIDs
.split(",")):
629 if policyOID
== "any":
630 policyOID
= "2.5.29.32.0"
631 policy
= rfc2459
.PolicyInformation()
632 policyIdentifier
= rfc2459
.CertPolicyId(policyOID
)
633 policy
["policyIdentifier"] = policyIdentifier
634 policies
.setComponentByPosition(pos
, policy
)
635 self
.addExtension(rfc2459
.id_ce_certificatePolicies
, policies
, critical
)
637 def addNameConstraints(self
, constraints
, critical
):
638 nameConstraints
= rfc2459
.NameConstraints()
639 if constraints
.startswith("permitted:"):
640 (subtreesType
, subtreesTag
) = ("permittedSubtrees", 0)
641 elif constraints
.startswith("excluded:"):
642 (subtreesType
, subtreesTag
) = ("excludedSubtrees", 1)
644 raise UnknownNameConstraintsSpecificationError(constraints
)
645 generalSubtrees
= rfc2459
.GeneralSubtrees().subtype(
647 tag
.tagClassContext
, tag
.tagFormatConstructed
, subtreesTag
650 subtrees
= constraints
[(constraints
.find(":") + 1) :]
651 for pos
, name
in enumerate(subtrees
.split(",")):
652 generalName
= rfc2459
.GeneralName()
654 directoryName
= stringToDN(
655 name
, tag
.Tag(tag
.tagClassContext
, tag
.tagFormatSimple
, 4)
657 generalName
["directoryName"] = directoryName
659 generalName
["dNSName"] = name
660 generalSubtree
= rfc2459
.GeneralSubtree()
661 generalSubtree
["base"] = generalName
662 generalSubtrees
.setComponentByPosition(pos
, generalSubtree
)
663 nameConstraints
[subtreesType
] = generalSubtrees
664 self
.addExtension(rfc2459
.id_ce_nameConstraints
, nameConstraints
, critical
)
666 def addNSCertType(self
, certType
, critical
):
667 if certType
!= "sslServer":
668 raise UnknownNSCertTypeError(certType
)
670 univ
.ObjectIdentifier("2.16.840.1.113730.1.1"),
671 univ
.BitString("'01'B"),
675 def addDelegationUsage(self
, critical
):
677 raise UnknownDelegatedCredentialError(critical
)
679 univ
.ObjectIdentifier("1.3.6.1.4.1.44363.44"), univ
.Null(), critical
682 def addTLSFeature(self
, features
, critical
):
683 namedFeatures
= {"OCSPMustStaple": 5}
684 featureList
= [f
.strip() for f
in features
.split(",")]
685 sequence
= univ
.Sequence()
686 for pos
, feature
in enumerate(featureList
):
689 featureValue
= int(feature
)
692 featureValue
= namedFeatures
[feature
]
694 raise UnknownTLSFeature(feature
)
695 sequence
.setComponentByPosition(pos
, univ
.Integer(featureValue
))
697 univ
.ObjectIdentifier("1.3.6.1.5.5.7.1.24"), sequence
, critical
700 def addEmbeddedSCTListData(self
):
701 (scts
, critical
) = self
.savedEmbeddedSCTListData
703 for sctSpec
in scts
.split(","):
704 match
= re
.search(r
"(\w+):(\d{8})", sctSpec
)
706 raise InvalidSCTSpecification(sctSpec
)
707 keySpec
= match
.group(1)
708 key
= pykey
.keyFromSpecification(keySpec
)
709 time
= datetime
.datetime
.strptime(match
.group(2), "%Y%m%d")
710 tbsCertificate
= self
.getTBSCertificate()
711 tbsDER
= encoder
.encode(tbsCertificate
)
712 sct
= pyct
.SCT(key
, time
, pyct
.PrecertEntry(tbsDER
, self
.issuerKey
))
713 signed
= sct
.signAndEncode()
714 lengthPrefix
= pack("!H", len(signed
))
715 encodedSCTs
.append(lengthPrefix
+ signed
)
716 encodedSCTBytes
= b
"".join(encodedSCTs
)
717 lengthPrefix
= pack("!H", len(encodedSCTBytes
))
718 extensionBytes
= lengthPrefix
+ encodedSCTBytes
720 univ
.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"),
721 univ
.OctetString(extensionBytes
),
725 def getVersion(self
):
726 return rfc2459
.Version(self
.versionValue
).subtype(
727 explicitTag
=tag
.Tag(tag
.tagClassContext
, tag
.tagFormatSimple
, 0)
730 def getSerialNumber(self
):
731 return decoder
.decode(self
.serialNumber
)[0]
734 return stringToDN(self
.issuer
)
736 def getValidity(self
):
737 validity
= rfc2459
.Validity()
738 validity
["notBefore"] = self
.getNotBefore()
739 validity
["notAfter"] = self
.getNotAfter()
742 def getNotBefore(self
):
743 return datetimeToTime(self
.notBefore
)
745 def getNotAfter(self
):
746 return datetimeToTime(self
.notAfter
)
748 def getSubject(self
):
749 return stringToDN(self
.subject
)
751 def getTBSCertificate(self
):
752 (signatureOID
, _
) = stringToAlgorithmIdentifiers(self
.signature
)
753 tbsCertificate
= rfc2459
.TBSCertificate()
754 tbsCertificate
["version"] = self
.getVersion()
755 tbsCertificate
["serialNumber"] = self
.getSerialNumber()
756 tbsCertificate
["signature"] = signatureOID
757 tbsCertificate
["issuer"] = self
.getIssuer()
758 tbsCertificate
["validity"] = self
.getValidity()
759 tbsCertificate
["subject"] = self
.getSubject()
760 tbsCertificate
["subjectPublicKeyInfo"] = (
761 self
.subjectKey
.asSubjectPublicKeyInfo()
764 extensions
= rfc2459
.Extensions().subtype(
765 explicitTag
=tag
.Tag(tag
.tagClassContext
, tag
.tagFormatSimple
, 3)
767 for count
, extension
in enumerate(self
.extensions
):
768 extensions
.setComponentByPosition(count
, extension
)
769 tbsCertificate
["extensions"] = extensions
770 return tbsCertificate
773 (signatureOID
, hashAlgorithm
) = stringToAlgorithmIdentifiers(self
.signature
)
774 certificate
= rfc2459
.Certificate()
775 tbsCertificate
= self
.getTBSCertificate()
776 certificate
["tbsCertificate"] = tbsCertificate
777 certificate
["signatureAlgorithm"] = signatureOID
778 tbsDER
= encoder
.encode(tbsCertificate
)
779 certificate
["signatureValue"] = self
.issuerKey
.sign(tbsDER
, hashAlgorithm
)
780 return encoder
.encode(certificate
)
783 output
= "-----BEGIN CERTIFICATE-----"
785 b64
= six
.ensure_text(base64
.b64encode(der
))
787 output
+= "\n" + b64
[:64]
789 output
+= "\n-----END CERTIFICATE-----"
793 # The build harness will call this function with an output
794 # file-like object and a path to a file containing a
795 # specification. This will read the specification and output
796 # the certificate as PEM.
797 def main(output
, inputPath
):
798 with
open(inputPath
) as configStream
:
799 output
.write(Certificate(configStream
).toPEM() + "\n")
802 # When run as a standalone program, this will read a specification from
803 # stdin and output the certificate as PEM to stdout.
804 if __name__
== "__main__":
805 print(Certificate(sys
.stdin
).toPEM())