import sys
import warnings
from unittest import TestCase
from base64 import b64decode, urlsafe_b64encode
import mock
from nose.tools import eq_, raises
import six
from . import Receiver, Sender
from .base import Resource, EmptyValue
from .exc import (AlreadyProcessed,
BadHeaderValue,
CredentialsLookupError,
HawkFail,
InvalidCredentials,
MacMismatch,
MisComputedContentHash,
MissingAuthorization,
TokenExpired,
InvalidBewit,
MissingContent)
from .util import (parse_authorization_header,
utc_now,
calculate_payload_hash,
calculate_ts_mac,
validate_credentials)
from .bewit import (get_bewit,
check_bewit,
strip_bewit,
parse_bewit)
# Ensure deprecation warnings are turned to exceptions
warnings.filterwarnings('error' )
class Base(TestCase):
def setUp(self):
self.credentials = {
'id' : 'my-hawk-id' ,
'key' : 'my hAwK sekret' ,
'algorithm' : 'sha256' ,
}
# This callable might be replaced by tests.
def seen_nonce(id, nonce, ts):
return False
self.seen_nonce = seen_nonce
def credentials_map(self, id):
# Pretend this is doing something more interesting like looking up
# a credentials by ID in a database.
if self.credentials['id' ] != id:
raise LookupError('No credentialsuration for Hawk ID {id}'
.format(id=id))
return self.credentials
class TestConfig(Base):
@raises(InvalidCredentials)
def test_no_id(self):
c = self.credentials.copy()
del c['id' ]
validate_credentials(c)
@raises(InvalidCredentials)
def test_no_key(self):
c = self.credentials.copy()
del c['key' ]
validate_credentials(c)
@raises(InvalidCredentials)
def test_no_algo(self):
c = self.credentials.copy()
del c['algorithm' ]
validate_credentials(c)
@raises(InvalidCredentials)
def test_no_credentials(self):
validate_credentials(None )
def test_non_dict_credentials(self):
class WeirdThing(object):
def __getitem__(self, key):
return 'whatever'
validate_credentials(WeirdThing())
class TestSender(Base):
def setUp(self):
super(TestSender, self).setUp()
self.url = 'http://site.com/foo?bar=1 '
def Sender(self, method='GET' , **kw):
credentials = kw.pop('credentials' , self.credentials)
kw.setdefault('content' , '' )
kw.setdefault('content_type' , '' )
sender = Sender(credentials, self.url, method, **kw)
return sender
def receive(self, request_header, url=None , method='GET' , **kw):
credentials_map = kw.pop('credentials_map' , self.credentials_map)
kw.setdefault('content' , '' )
kw.setdefault('content_type' , '' )
kw.setdefault('seen_nonce' , self.seen_nonce)
return Receiver(credentials_map, request_header,
url or self.url, method, **kw)
def test_get_ok(self):
method = 'GET'
sn = self.Sender(method=method)
self.receive(sn.request_header, method=method)
def test_post_ok(self):
method = 'POST'
sn = self.Sender(method=method)
self.receive(sn.request_header, method=method)
def test_post_content_ok(self):
method = 'POST'
content = 'foo=bar&baz=2'
sn = self.Sender(method=method, content=content)
self.receive(sn.request_header, method=method, content=content)
def test_post_content_type_ok(self):
method = 'POST'
content = '{"bar": "foobs"}'
content_type = 'application/json'
sn = self.Sender(method=method, content=content,
content_type=content_type)
self.receive(sn.request_header, method=method, content=content,
content_type=content_type)
def test_post_content_type_with_trailing_charset(self):
method = 'POST'
content = '{"bar": "foobs"}'
content_type = 'application/json; charset=utf8'
sn = self.Sender(method=method, content=content,
content_type=content_type)
self.receive(sn.request_header, method=method, content=content,
content_type='application/json; charset=other' )
@raises(MissingContent)
def test_missing_payload_details(self):
self.Sender(method='POST' , content=EmptyValue,
content_type=EmptyValue)
def test_skip_payload_hashing(self):
method = 'POST'
content = '{"bar": "foobs"}'
content_type = 'application/json'
sn = self.Sender(method=method, content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False )
self.assertFalse('hash="' in sn.request_header)
self.receive(sn.request_header, method=method, content=content,
content_type=content_type,
accept_untrusted_content=True )
def test_empty_payload_hashing(self):
method = 'GET'
content = None
content_type = None
sn = self.Sender(method=method, content=content,
content_type=content_type)
self.assertTrue('hash="' in sn.request_header)
self.receive(sn.request_header, method=method, content=content,
content_type=content_type)
def test_empty_payload_hashing_always_hash_false(self):
method = 'GET'
content = None
content_type = None
sn = self.Sender(method=method, content=content,
content_type=content_type,
always_hash_content=False )
self.assertTrue('hash="' in sn.request_header)
self.receive(sn.request_header, method=method, content=content,
content_type=content_type)
def test_empty_payload_hashing_accept_untrusted(self):
method = 'GET'
content = None
content_type = None
sn = self.Sender(method=method, content=content,
content_type=content_type)
self.assertTrue('hash="' in sn.request_header)
self.receive(sn.request_header, method=method, content=content,
content_type=content_type,
accept_untrusted_content=True )
@raises(MissingContent)
def test_cannot_skip_content_only(self):
self.Sender(method='POST' , content=EmptyValue,
content_type='application/json' )
@raises(MissingContent)
def test_cannot_skip_content_type_only(self):
self.Sender(method='POST' , content='{"foo": "bar"}' ,
content_type=EmptyValue)
@raises(MacMismatch)
def test_tamper_with_host(self):
sn = self.Sender()
self.receive(sn.request_header, url='http://TAMPERED-WITH.com ' )
@raises(MacMismatch)
def test_tamper_with_method(self):
sn = self.Sender(method='GET' )
self.receive(sn.request_header, method='POST' )
@raises(MacMismatch)
def test_tamper_with_path(self):
sn = self.Sender()
self.receive(sn.request_header,
url='http://site.com/TAMPERED?bar=1 ' )
@raises(MacMismatch)
def test_tamper_with_query(self):
sn = self.Sender()
self.receive(sn.request_header,
url='http://site.com/foo?bar=TAMPERED ' )
@raises(MacMismatch)
def test_tamper_with_scheme(self):
sn = self.Sender()
self.receive(sn.request_header, url='https://site.com/foo?bar=1 ' )
@raises(MacMismatch)
def test_tamper_with_port(self):
sn = self.Sender()
self.receive(sn.request_header,
url='http://site.com:8000/foo?bar=1 ' )
@raises(MisComputedContentHash)
def test_tamper_with_content(self):
sn = self.Sender()
self.receive(sn.request_header, content='stuff=nope' )
def test_non_ascii_content(self):
content = u'Ivan Kristi\u0107'
sn = self.Sender(content=content)
self.receive(sn.request_header, content=content)
@raises(MacMismatch)
def test_tamper_with_content_type(self):
sn = self.Sender(method='POST' )
self.receive(sn.request_header, content_type='application/json' )
@raises(AlreadyProcessed)
def test_nonce_fail(self):
def seen_nonce(id, nonce, ts):
return True
sn = self.Sender()
self.receive(sn.request_header, seen_nonce=seen_nonce)
def test_nonce_ok(self):
def seen_nonce(id, nonce, ts):
return False
sn = self.Sender(seen_nonce=seen_nonce)
self.receive(sn.request_header)
@raises(TokenExpired)
def test_expired_ts(self):
now = utc_now() - 120
sn = self.Sender(_timestamp=now)
self.receive(sn.request_header)
def test_expired_exception_reports_localtime(self):
now = utc_now()
ts = now - 120
sn = self.Sender(_timestamp=ts) # force expiry
exc = None
with mock.patch('mohawk.base.utc_now' ) as fake_now:
fake_now.return_value = now
try :
self.receive(sn.request_header)
except :
etype, exc, tb = sys.exc_info()
eq_(type(exc), TokenExpired)
eq_(exc.localtime_in_seconds, now)
def test_localtime_offset(self):
now = utc_now() - 120
sn = self.Sender(_timestamp=now)
# Without an offset this will raise an expired exception.
self.receive(sn.request_header, localtime_offset_in_seconds=-120)
def test_localtime_skew(self):
now = utc_now() - 120
sn = self.Sender(_timestamp=now)
# Without an offset this will raise an expired exception.
self.receive(sn.request_header, timestamp_skew_in_seconds=120)
@raises(MacMismatch)
def test_hash_tampering(self):
sn = self.Sender()
header = sn.request_header.replace('hash="' , 'hash="nope' )
self.receive(header)
@raises(MacMismatch)
def test_bad_secret(self):
cfg = {
'id' : 'my-hawk-id' ,
'key' : 'INCORRECT; YOU FAIL' ,
'algorithm' : 'sha256' ,
}
sn = self.Sender(credentials=cfg)
self.receive(sn.request_header)
@raises(MacMismatch)
def test_unexpected_algorithm(self):
cr = self.credentials.copy()
cr['algorithm' ] = 'sha512'
sn = self.Sender(credentials=cr)
# Validate with mismatched credentials (sha256).
self.receive(sn.request_header)
@raises(InvalidCredentials)
def test_invalid_credentials(self):
cfg = self.credentials.copy()
# Create an invalid credentials.
del cfg['algorithm' ]
self.Sender(credentials=cfg)
@raises(CredentialsLookupError)
def test_unknown_id(self):
cr = self.credentials.copy()
cr['id' ] = 'someone-else'
sn = self.Sender(credentials=cr)
self.receive(sn.request_header)
@raises(MacMismatch)
def test_bad_ext(self):
sn = self.Sender(ext='my external data' )
header = sn.request_header.replace('my external data' , 'TAMPERED' )
self.receive(header)
@raises(BadHeaderValue)
def test_duplicate_keys(self):
sn = self.Sender(ext='someext' )
header = sn.request_header + ', ext="otherext"'
self.receive(header)
@raises(BadHeaderValue)
def test_ext_with_quotes(self):
sn = self.Sender(ext='quotes=""' )
self.receive(sn.request_header)
@raises(BadHeaderValue)
def test_ext_with_new_line(self):
sn = self.Sender(ext="new line \n in the middle" )
self.receive(sn.request_header)
def test_ext_with_equality_sign(self):
sn = self.Sender(ext="foo=bar&foo2=bar2;foo3=bar3" )
self.receive(sn.request_header)
parsed = parse_authorization_header(sn.request_header)
eq_(parsed['ext' ], "foo=bar&foo2=bar2;foo3=bar3" )
@raises(HawkFail)
def test_non_hawk_scheme(self):
parse_authorization_header('Basic user:base64pw' )
@raises(HawkFail)
def test_invalid_key(self):
parse_authorization_header('Hawk mac="validmac" unknownkey="value"' )
def test_ext_with_all_valid_characters(self):
valid_characters = "!#$%&'()*+,-./:;<=>?@[]^_`{|}~ azAZ09_"
sender = self.Sender(ext=valid_characters)
parsed = parse_authorization_header(sender.request_header)
eq_(parsed['ext' ], valid_characters)
@raises(BadHeaderValue)
def test_ext_with_illegal_chars(self):
self.Sender(ext="something like \t is illegal" )
def test_unparseable_header(self):
try :
parse_authorization_header('Hawk mac="somemac", unparseable' )
except BadHeaderValue as exc:
error_msg = str(exc)
self.assertTrue("Couldn't parse Hawk header" in error_msg)
self.assertTrue("unparseable" in error_msg)
else :
self.fail('should raise' )
@raises(BadHeaderValue)
def test_ext_with_illegal_unicode(self):
self.Sender(ext=u'Ivan Kristi\u0107' )
@raises(BadHeaderValue)
def test_too_long_header(self):
sn = self.Sender(ext='a' *5000)
self.receive(sn.request_header)
@raises(BadHeaderValue)
def test_ext_with_illegal_utf8(self):
# This isn't allowed because the escaped byte chars are out of
# range.
self.Sender(ext=u'Ivan Kristi\u0107' .encode('utf8' ))
def test_app_ok(self):
app = 'custom-app'
sn = self.Sender(app=app)
self.receive(sn.request_header)
parsed = parse_authorization_header(sn.request_header)
eq_(parsed['app' ], app)
@raises(MacMismatch)
def test_tampered_app(self):
app = 'custom-app'
sn = self.Sender(app=app)
header = sn.request_header.replace(app, 'TAMPERED-WITH' )
self.receive(header)
def test_dlg_ok(self):
dlg = 'custom-dlg'
sn = self.Sender(dlg=dlg)
self.receive(sn.request_header)
parsed = parse_authorization_header(sn.request_header)
eq_(parsed['dlg' ], dlg)
@raises(MacMismatch)
def test_tampered_dlg(self):
dlg = 'custom-dlg'
sn = self.Sender(dlg=dlg, app='some-app' )
header = sn.request_header.replace(dlg, 'TAMPERED-WITH' )
self.receive(header)
def test_file_content(self):
method = "POST"
content = six.BytesIO(b"FILE CONTENT" )
sn = self.Sender(method, content=content)
self.receive(sn.request_header, method=method, content=content.getvalue())
def test_binary_file_content(self):
method = "POST"
content = six.BytesIO(b"\x00\xffCONTENT\xff\x00" )
sn = self.Sender(method, content=content)
self.receive(sn.request_header, method=method, content=content.getvalue())
@raises(MisComputedContentHash)
def test_bad_file_content(self):
method = "POST"
content = six.BytesIO(b"FILE CONTENT" )
sn = self.Sender(method, content=content)
self.receive(sn.request_header, method=method, content="BAD FILE CONTENT" )
class TestReceiver(Base):
def setUp(self):
super(TestReceiver, self).setUp()
self.url = 'http://site.com/ '
self.sender = None
self.receiver = None
def receive(self, method='GET' , **kw):
url = kw.pop('url' , self.url)
sender = kw.pop('sender' , None )
sender_kw = kw.pop('sender_kw' , {})
sender_kw.setdefault('content' , '' )
sender_kw.setdefault('content_type' , '' )
sender_url = kw.pop('sender_url' , url)
credentials_map = kw.pop('credentials_map' ,
lambda id: self.credentials)
if sender:
self.sender = sender
else :
self.sender = Sender(self.credentials, sender_url, method,
**sender_kw)
kw.setdefault('content' , '' )
kw.setdefault('content_type' , '' )
self.receiver = Receiver(credentials_map,
self.sender.request_header, url, method,
**kw)
def respond(self, **kw):
accept_kw = kw.pop('accept_kw' , {})
accept_kw.setdefault('content' , '' )
accept_kw.setdefault('content_type' , '' )
receiver = kw.pop('receiver' , self.receiver)
kw.setdefault('content' , '' )
kw.setdefault('content_type' , '' )
receiver.respond(**kw)
self.sender.accept_response(receiver.response_header, **accept_kw)
return receiver.response_header
@raises(InvalidCredentials)
def test_invalid_credentials_lookup(self):
# Return invalid credentials.
self.receive(credentials_map=lambda *a: {})
def test_get_ok(self):
method = 'GET'
self.receive(method=method)
self.respond()
def test_post_ok(self):
method = 'POST'
self.receive(method=method)
self.respond()
@raises(MisComputedContentHash)
def test_respond_with_wrong_content(self):
self.receive()
self.respond(content='real content' ,
accept_kw=dict(content='TAMPERED WITH' ))
@raises(MisComputedContentHash)
def test_respond_with_wrong_content_type(self):
self.receive()
self.respond(content_type='text/html' ,
accept_kw=dict(content_type='application/json' ))
@raises(MissingAuthorization)
def test_missing_authorization(self):
Receiver(lambda id: self.credentials, None , '/' , 'GET' )
@raises(MacMismatch)
def test_respond_with_wrong_url(self):
self.receive(url='http://fakesite.com ' )
wrong_receiver = self.receiver
self.receive(url='http://realsite.com ' )
self.respond(receiver=wrong_receiver)
@raises(MacMismatch)
def test_respond_with_wrong_method(self):
self.receive(method='GET' )
wrong_receiver = self.receiver
self.receive(method='POST' )
self.respond(receiver=wrong_receiver)
@raises(MacMismatch)
def test_respond_with_wrong_nonce(self):
self.receive(sender_kw=dict(nonce='another-nonce' ))
wrong_receiver = self.receiver
self.receive()
# The nonce must match the one sent in the original request.
self.respond(receiver=wrong_receiver)
def test_respond_with_unhashed_content(self):
self.receive()
self.respond(always_hash_content=False , content=None ,
content_type=None ,
accept_kw=dict(accept_untrusted_content=True ))
@raises(TokenExpired)
def test_respond_with_expired_ts(self):
self.receive()
hdr = self.receiver.respond(content='' , content_type='' )
with mock.patch('mohawk.base.utc_now' ) as fn:
fn.return_value = 0 # force an expiry
try :
self.sender.accept_response(hdr, content='' , content_type='' )
except TokenExpired:
etype, exc, tb = sys.exc_info()
hdr = parse_authorization_header(exc.www_authenticate)
calculated = calculate_ts_mac(fn(), self.credentials)
if isinstance(calculated, six.binary_type):
calculated = calculated.decode('ascii' )
eq_(hdr['tsm' ], calculated)
raise
def test_respond_with_bad_ts_skew_ok(self):
now = utc_now() - 120
self.receive()
hdr = self.receiver.respond(content='' , content_type='' )
with mock.patch('mohawk.base.utc_now' ) as fn:
fn.return_value = now
# Without an offset this will raise an expired exception.
self.sender.accept_response(hdr, content='' , content_type='' ,
timestamp_skew_in_seconds=120)
def test_respond_with_ext(self):
self.receive()
ext = 'custom-ext'
self.respond(ext=ext)
header = parse_authorization_header(self.receiver.response_header)
eq_(header['ext' ], ext)
@raises(MacMismatch)
def test_respond_with_wrong_app(self):
self.receive(sender_kw=dict(app='TAMPERED-WITH' , dlg='delegation' ))
self.receiver.respond(content='' , content_type='' )
wrong_receiver = self.receiver
self.receive(sender_kw=dict(app='real-app' , dlg='delegation' ))
self.sender.accept_response(wrong_receiver.response_header,
content='' , content_type='' )
@raises(MacMismatch)
def test_respond_with_wrong_dlg(self):
self.receive(sender_kw=dict(app='app' , dlg='TAMPERED-WITH' ))
self.receiver.respond(content='' , content_type='' )
wrong_receiver = self.receiver
self.receive(sender_kw=dict(app='app' , dlg='real-dlg' ))
self.sender.accept_response(wrong_receiver.response_header,
content='' , content_type='' )
@raises(MacMismatch)
def test_receive_wrong_method(self):
self.receive(method='GET' )
wrong_sender = self.sender
self.receive(method='POST' , sender=wrong_sender)
@raises(MacMismatch)
def test_receive_wrong_url(self):
self.receive(url='http://fakesite.com/ ' )
wrong_sender = self.sender
self.receive(url='http://realsite.com/ ' , sender=wrong_sender)
@raises(MisComputedContentHash)
def test_receive_wrong_content(self):
self.receive(sender_kw=dict(content='real request' ),
content='real request' )
wrong_sender = self.sender
self.receive(content='TAMPERED WITH' , sender=wrong_sender)
def test_expected_unhashed_empty_content(self):
# This test sets up a scenario where the receiver will receive empty
# strings for content and content_type and no content hash in the auth
# header.
# This is to account for callers that might provide empty strings for
# the payload when in fact there is literally no content. In this case,
# mohawk depends on the presence of the content hash in the auth header
# to determine how to treat the empty strings: no hash in the header
# implies that no hashing is expected to occur on the server.
self.receive(content='' ,
content_type='' ,
sender_kw=dict(content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False ))
@raises(MisComputedContentHash)
def test_expected_unhashed_empty_content_with_content_type(self):
# This test sets up a scenario where the receiver will receive an
# empty content string and no content hash in the auth header, but
# some value for content_type.
# This is to confirm that the hash is calculated and compared (to the
# hash of mock empty payload, which should fail) when it appears that
# the sender has sent a 0-length payload body.
self.receive(content='' ,
content_type='text/plain' ,
sender_kw=dict(content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False ))
@raises(MisComputedContentHash)
def test_expected_unhashed_content_with_empty_content_type(self):
# This test sets up a scenario where the receiver will receive some
# content but the empty string for the content_type and no content hash
# in the auth header.
# This is to confirm that the hash is calculated and compared (to the
# hash of mock empty payload, which should fail) when the sender has
# sent unhashed content.
self.receive(content='some content' ,
content_type='' ,
sender_kw=dict(content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False ))
def test_empty_content_with_content_type(self):
# This test sets up a scenario where the receiver will receive an
# empty content string, some value for content_type and a content hash.
# This is to confirm that the hash is calculated and compared correctly
# when the sender has sent a hashed 0-length payload body.
self.receive(content='' ,
content_type='text/plain' ,
sender_kw=dict(content='' ,
content_type='text/plain' ))
def test_expected_unhashed_no_content(self):
# This test sets up a scenario where the receiver will receive None for
# content and content_type and no content hash in the auth header.
# This is like test_expected_unhashed_empty_content(), but tests for
# the less ambiguous case where the caller has explicitly passed in None
# to indicate that there is no content to hash.
self.receive(content=None ,
content_type=None ,
sender_kw=dict(content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False ))
@raises(MisComputedContentHash)
def test_expected_unhashed_no_content_with_content_type(self):
# This test sets up a scenario where the receiver will receive None for
# content and no content hash in the auth header, but some value for
# content_type.
# In this case, the content will be coerced to the empty string for
# hashing purposes. The request should fail, as the there is no content
# hash in the request to compare against. While this may not be in
# accordance with the js reference spec, it's the safest (ie. most
# secure) way of handling this bizarre set of circumstances.
self.receive(content=None ,
content_type='text/plain' ,
sender_kw=dict(content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False ))
@raises(MisComputedContentHash)
def test_expected_unhashed_content_with_no_content_type(self):
# This test sets up a scenario where the receiver will receive some
# content but no value for the content_type and no content hash in
# the auth header.
# This is to confirm that the hash is calculated and compared (to the
# hash of mock empty payload, which should fail) when the sender has
# sent unhashed content.
self.receive(content='some content' ,
content_type=None ,
sender_kw=dict(content=EmptyValue,
content_type=EmptyValue,
always_hash_content=False ))
def test_no_content_with_content_type(self):
# This test sets up a scenario where the receiver will receive None for
# the content string, some value for content_type and a content hash.
# This is to confirm that coercing None to the empty string when a hash
# is expected allows the hash to be calculated and compared correctly
# as if the sender has sent a hashed 0-length payload body.
self.receive(content=None ,
content_type='text/plain' ,
sender_kw=dict(content='' ,
content_type='text/plain' ))
@raises(MissingContent)
def test_cannot_receive_empty_content_only(self):
content_type = 'text/plain'
self.receive(sender_kw=dict(content='' ,
content_type=content_type),
content=EmptyValue, content_type=content_type)
@raises(MissingContent)
def test_cannot_receive_empty_content_type_only(self):
content = ''
self.receive(sender_kw=dict(content=content,
content_type='text/plain' ),
content=content, content_type=EmptyValue)
@raises(MisComputedContentHash)
def test_receive_wrong_content_type(self):
self.receive(sender_kw=dict(content_type='text/html' ),
content_type='text/html' )
wrong_sender = self.sender
self.receive(content_type='application/json' ,
sender=wrong_sender)
class TestSendAndReceive(Base):
def test(self):
credentials = {
'id' : 'some-id' ,
'key' : 'some secret' ,
'algorithm' : 'sha256'
}
url = 'https://my-site.com/ '
method = 'POST'
# The client sends a request with a Hawk header.
content = 'foo=bar&baz=nooz'
content_type = 'application/x-www-form-urlencoded'
sender = Sender(credentials,
url, method,
content=content,
content_type=content_type)
# The server receives a request and authorizes access.
receiver = Receiver(lambda id: credentials,
sender.request_header,
url, method,
content=content,
content_type=content_type)
# The server responds with a similar Hawk header.
content = 'we are friends'
content_type = 'text/plain'
receiver.respond(content=content,
content_type=content_type)
# The client receives a response and authorizes access.
sender.accept_response(receiver.response_header,
content=content,
content_type=content_type)
class TestBewit(Base):
# Test cases copied from
# https://github.com/hueniverse/hawk/blob/492632da51ecedd5f59ce96f081860ad24ce6532/test/uri.js
def setUp(self):
self.credentials = {
'id' : '123456' ,
'key' : '2983d45yun89q' ,
'algorithm' : 'sha256' ,
}
def make_credential_lookup(self, credentials_map):
# Helper function to make a lookup function given a dictionary of
# credentials
def lookup(client_id):
# Will raise a KeyError if missing; which is a subclass of
# LookupError
return credentials_map[client_id]
return lookup
def test_bewit(self):
res = Resource(url='https://example.com/somewhere/over/the/rainbow ' ,
method='GET' , credentials=self.credentials,
timestamp=1356420407 + 300,
nonce='' ,
)
bewit = get_bewit(res)
expected = '123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
eq_(b64decode(bewit).decode('ascii' ), expected)
def test_bewit_with_binary_id(self):
# Check for exceptions in get_bewit call with binary id
binary_credentials = self.credentials.copy()
binary_credentials['id' ] = binary_credentials['id' ].encode('ascii' )
res = Resource(url='https://example.com/somewhere/over/the/rainbow ' ,
method='GET' , credentials=binary_credentials,
timestamp=1356420407 + 300,
nonce='' ,
)
get_bewit(res)
def test_bewit_with_ext(self):
res = Resource(url='https://example.com/somewhere/over/the/rainbow ' ,
method='GET' , credentials=self.credentials,
timestamp=1356420407 + 300,
nonce='' ,
ext='xandyandz'
)
bewit = get_bewit(res)
expected = '123456\\1356420707\\kscxwNR2tJpP1T1zDLNPbB5UiKIU9tOSJXTUdG7X9h8=\\xandyandz'
eq_(b64decode(bewit).decode('ascii' ), expected)
@raises(BadHeaderValue)
def test_bewit_with_invalid_ext(self):
res = Resource(url='https://example.com/somewhere/over/the/rainbow ' ,
method='GET' , credentials=self.credentials,
timestamp=1356420407 + 300,
nonce='' ,
ext='xand\\yandz' )
get_bewit(res)
@raises(BadHeaderValue)
def test_bewit_with_backslashes_in_id(self):
credentials = self.credentials
credentials['id' ] = '123\\456'
res = Resource(url='https://example.com/somewhere/over/the/rainbow ' ,
method='GET' , credentials=self.credentials,
timestamp=1356420407 + 300,
nonce='' )
get_bewit(res)
def test_bewit_with_port(self):
res = Resource(url='https://example.com:8080/somewhere/over/the/rainbow ' ,
method='GET' , credentials=self.credentials,
timestamp=1356420407 + 300, nonce='' , ext='xandyandz' )
bewit = get_bewit(res)
expected = '123456\\1356420707\\hZbJ3P2cKEo4ky0C8jkZAkRyCZueg4WSNbxV7vq3xHU=\\xandyandz'
eq_(b64decode(bewit).decode('ascii' ), expected)
@raises(ValueError)
def test_bewit_with_nonce(self):
res = Resource(url='https://example.com/somewhere/over/the/rainbow ' ,
method='GET' , credentials=self.credentials,
timestamp=1356420407 + 300,
nonce='n1' )
get_bewit(res)
@raises(ValueError)
def test_bewit_invalid_method(self):
res = Resource(url='https://example.com:8080/somewhere/over/the/rainbow ' ,
method='POST' , credentials=self.credentials,
timestamp=1356420407 + 300, nonce='' )
get_bewit(res)
def test_strip_bewit(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit} " .format(bewit=bewit)
raw_bewit, stripped_url = strip_bewit(url)
self.assertEqual(raw_bewit, bewit)
self.assertEqual(stripped_url, "https://example.com/somewhere/over/the/rainbow " )
@raises(InvalidBewit)
def test_strip_url_without_bewit(self):
url = "https://example.com/somewhere/over/the/rainbow "
strip_bewit(url)
def test_parse_bewit(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
bewit = parse_bewit(bewit)
self.assertEqual(bewit.id, '123456' )
self.assertEqual(bewit.expiration, '1356420707' )
self.assertEqual(bewit.mac, 'IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=' )
self.assertEqual(bewit.ext, '' )
def test_parse_bewit_with_ext(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\xandyandz'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
bewit = parse_bewit(bewit)
self.assertEqual(bewit.id, '123456' )
self.assertEqual(bewit.expiration, '1356420707' )
self.assertEqual(bewit.mac, 'IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=' )
self.assertEqual(bewit.ext, 'xandyandz' )
@raises(InvalidBewit)
def test_parse_bewit_with_ext_and_backslashes(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\xand\\yandz'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
parse_bewit(bewit)
@raises(InvalidBewit)
def test_parse_invalid_bewit_with_only_one_part(self):
bewit = b'12345'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
bewit = parse_bewit(bewit)
@raises(InvalidBewit)
def test_parse_invalid_bewit_with_only_two_parts(self):
bewit = b'1\\2'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
bewit = parse_bewit(bewit)
def test_validate_bewit(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit} " .format(bewit=bewit)
credential_lookup = self.make_credential_lookup({
self.credentials['id' ]: self.credentials,
})
self.assertTrue(check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10))
def test_validate_bewit_with_ext(self):
bewit = b'123456\\1356420707\\kscxwNR2tJpP1T1zDLNPbB5UiKIU9tOSJXTUdG7X9h8=\\xandyandz'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit} " .format(bewit=bewit)
credential_lookup = self.make_credential_lookup({
self.credentials['id' ]: self.credentials,
})
self.assertTrue(check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10))
@raises(InvalidBewit)
def test_validate_bewit_with_ext_and_backslashes(self):
bewit = b'123456\\1356420707\\b82LLIxG5UDkaChLU953mC+SMrbniV1sb8KiZi9cSsc=\\xand\\yandz'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit} " .format(bewit=bewit)
credential_lookup = self.make_credential_lookup({
self.credentials['id' ]: self.credentials,
})
check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10)
@raises(TokenExpired)
def test_validate_expired_bewit(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit} " .format(bewit=bewit)
credential_lookup = self.make_credential_lookup({
self.credentials['id' ]: self.credentials,
})
check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 1000)
@raises(CredentialsLookupError)
def test_validate_bewit_with_unknown_credentials(self):
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
bewit = urlsafe_b64encode(bewit).decode('ascii' )
url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit} " .format(bewit=bewit)
credential_lookup = self.make_credential_lookup({
'other_id' : self.credentials,
})
check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10)
class TestPayloadHash(Base):
def test_hash_file_read_blocks(self):
payload = six.BytesIO(b"\x00\xffhello world\xff\x00" )
h1 = calculate_payload_hash(payload, 'sha256' , 'application/json' , block_size=1)
payload.seek(0)
h2 = calculate_payload_hash(payload, 'sha256' , 'application/json' , block_size=1024)
self.assertEqual(h1, h2)
quality 100%
¤ Dauer der Verarbeitung: 0.23 Sekunden
(vorverarbeitet)
¤
*© Formatika GbR, Deutschland