Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  tests.py   Sprache: Python

 
import sys
import warnings
from unittest import TestCase
from base64 import b64decode, urlsafe_b64encode

import mock
from nose.tools import eq_, raises
import six

from . import Receiver, Sender
from .base import Resource, EmptyValue
from .exc import (AlreadyProcessed,
                  BadHeaderValue,
                  CredentialsLookupError,
                  HawkFail,
                  InvalidCredentials,
                  MacMismatch,
                  MisComputedContentHash,
                  MissingAuthorization,
                  TokenExpired,
                  InvalidBewit,
                  MissingContent)
from .util import (parse_authorization_header,
                   utc_now,
                   calculate_payload_hash,
                   calculate_ts_mac,
                   validate_credentials)
from .bewit import (get_bewit,
                    check_bewit,
                    strip_bewit,
                    parse_bewit)


# Ensure deprecation warnings are turned to exceptions
warnings.filterwarnings('error')


class Base(TestCase):

    def setUp(self):
        self.credentials = {
            'id''my-hawk-id',
            'key''my hAwK sekret',
            'algorithm''sha256',
        }

        # This callable might be replaced by tests.
        def seen_nonce(id, nonce, ts):
            return False
        self.seen_nonce = seen_nonce

    def credentials_map(self, id):
        # Pretend this is doing something more interesting like looking up
        # a credentials by ID in a database.
        if self.credentials['id'] != id:
            raise LookupError('No credentialsuration for Hawk ID {id}'
                              .format(id=id))
        return self.credentials


class TestConfig(Base):

    @raises(InvalidCredentials)
    def test_no_id(self):
        c = self.credentials.copy()
        del c['id']
        validate_credentials(c)

    @raises(InvalidCredentials)
    def test_no_key(self):
        c = self.credentials.copy()
        del c['key']
        validate_credentials(c)

    @raises(InvalidCredentials)
    def test_no_algo(self):
        c = self.credentials.copy()
        del c['algorithm']
        validate_credentials(c)

    @raises(InvalidCredentials)
    def test_no_credentials(self):
        validate_credentials(None)

    def test_non_dict_credentials(self):
        class WeirdThing(object):
            def __getitem__(self, key):
                return 'whatever'
        validate_credentials(WeirdThing())


class TestSender(Base):

    def setUp(self):
        super(TestSender, self).setUp()
        self.url = 'http://site.com/foo?bar=1'

    def Sender(self, method='GET', **kw):
        credentials = kw.pop('credentials', self.credentials)
        kw.setdefault('content''')
        kw.setdefault('content_type''')
        sender = Sender(credentials, self.url, method, **kw)
        return sender

    def receive(self, request_header, url=None, method='GET', **kw):
        credentials_map = kw.pop('credentials_map', self.credentials_map)
        kw.setdefault('content''')
        kw.setdefault('content_type''')
        kw.setdefault('seen_nonce', self.seen_nonce)
        return Receiver(credentials_map, request_header,
                        url or self.url, method, **kw)

    def test_get_ok(self):
        method = 'GET'
        sn = self.Sender(method=method)
        self.receive(sn.request_header, method=method)

    def test_post_ok(self):
        method = 'POST'
        sn = self.Sender(method=method)
        self.receive(sn.request_header, method=method)

    def test_post_content_ok(self):
        method = 'POST'
        content = 'foo=bar&baz=2'
        sn = self.Sender(method=method, content=content)
        self.receive(sn.request_header, method=method, content=content)

    def test_post_content_type_ok(self):
        method = 'POST'
        content = '{"bar": "foobs"}'
        content_type = 'application/json'
        sn = self.Sender(method=method, content=content,
                         content_type=content_type)
        self.receive(sn.request_header, method=method, content=content,
                     content_type=content_type)

    def test_post_content_type_with_trailing_charset(self):
        method = 'POST'
        content = '{"bar": "foobs"}'
        content_type = 'application/json; charset=utf8'
        sn = self.Sender(method=method, content=content,
                         content_type=content_type)
        self.receive(sn.request_header, method=method, content=content,
                     content_type='application/json; charset=other')

    @raises(MissingContent)
    def test_missing_payload_details(self):
        self.Sender(method='POST', content=EmptyValue,
                    content_type=EmptyValue)

    def test_skip_payload_hashing(self):
        method = 'POST'
        content = '{"bar": "foobs"}'
        content_type = 'application/json'
        sn = self.Sender(method=method, content=EmptyValue,
                         content_type=EmptyValue,
                         always_hash_content=False)
        self.assertFalse('hash="' in sn.request_header)
        self.receive(sn.request_header, method=method, content=content,
                     content_type=content_type,
                     accept_untrusted_content=True)

    def test_empty_payload_hashing(self):
        method = 'GET'
        content = None
        content_type = None
        sn = self.Sender(method=method, content=content,
                         content_type=content_type)
        self.assertTrue('hash="' in sn.request_header)
        self.receive(sn.request_header, method=method, content=content,
                     content_type=content_type)

    def test_empty_payload_hashing_always_hash_false(self):
        method = 'GET'
        content = None
        content_type = None
        sn = self.Sender(method=method, content=content,
                         content_type=content_type,
                         always_hash_content=False)
        self.assertTrue('hash="' in sn.request_header)
        self.receive(sn.request_header, method=method, content=content,
                     content_type=content_type)

    def test_empty_payload_hashing_accept_untrusted(self):
        method = 'GET'
        content = None
        content_type = None
        sn = self.Sender(method=method, content=content,
                         content_type=content_type)
        self.assertTrue('hash="' in sn.request_header)
        self.receive(sn.request_header, method=method, content=content,
                     content_type=content_type,
                     accept_untrusted_content=True)

    @raises(MissingContent)
    def test_cannot_skip_content_only(self):
        self.Sender(method='POST', content=EmptyValue,
                    content_type='application/json')

    @raises(MissingContent)
    def test_cannot_skip_content_type_only(self):
        self.Sender(method='POST', content='{"foo": "bar"}',
                    content_type=EmptyValue)

    @raises(MacMismatch)
    def test_tamper_with_host(self):
        sn = self.Sender()
        self.receive(sn.request_header, url='http://TAMPERED-WITH.com')

    @raises(MacMismatch)
    def test_tamper_with_method(self):
        sn = self.Sender(method='GET')
        self.receive(sn.request_header, method='POST')

    @raises(MacMismatch)
    def test_tamper_with_path(self):
        sn = self.Sender()
        self.receive(sn.request_header,
                     url='http://site.com/TAMPERED?bar=1')

    @raises(MacMismatch)
    def test_tamper_with_query(self):
        sn = self.Sender()
        self.receive(sn.request_header,
                     url='http://site.com/foo?bar=TAMPERED')

    @raises(MacMismatch)
    def test_tamper_with_scheme(self):
        sn = self.Sender()
        self.receive(sn.request_header, url='https://site.com/foo?bar=1')

    @raises(MacMismatch)
    def test_tamper_with_port(self):
        sn = self.Sender()
        self.receive(sn.request_header,
                     url='http://site.com:8000/foo?bar=1')

    @raises(MisComputedContentHash)
    def test_tamper_with_content(self):
        sn = self.Sender()
        self.receive(sn.request_header, content='stuff=nope')

    def test_non_ascii_content(self):
        content = u'Ivan Kristi\u0107'
        sn = self.Sender(content=content)
        self.receive(sn.request_header, content=content)

    @raises(MacMismatch)
    def test_tamper_with_content_type(self):
        sn = self.Sender(method='POST')
        self.receive(sn.request_header, content_type='application/json')

    @raises(AlreadyProcessed)
    def test_nonce_fail(self):

        def seen_nonce(id, nonce, ts):
            return True

        sn = self.Sender()

        self.receive(sn.request_header, seen_nonce=seen_nonce)

    def test_nonce_ok(self):

        def seen_nonce(id, nonce, ts):
            return False

        sn = self.Sender(seen_nonce=seen_nonce)
        self.receive(sn.request_header)

    @raises(TokenExpired)
    def test_expired_ts(self):
        now = utc_now() - 120
        sn = self.Sender(_timestamp=now)
        self.receive(sn.request_header)

    def test_expired_exception_reports_localtime(self):
        now = utc_now()
        ts = now - 120
        sn = self.Sender(_timestamp=ts)  # force expiry

        exc = None
        with mock.patch('mohawk.base.utc_now'as fake_now:
            fake_now.return_value = now
            try:
                self.receive(sn.request_header)
            except:
                etype, exc, tb = sys.exc_info()

        eq_(type(exc), TokenExpired)
        eq_(exc.localtime_in_seconds, now)

    def test_localtime_offset(self):
        now = utc_now() - 120
        sn = self.Sender(_timestamp=now)
        # Without an offset this will raise an expired exception.
        self.receive(sn.request_header, localtime_offset_in_seconds=-120)

    def test_localtime_skew(self):
        now = utc_now() - 120
        sn = self.Sender(_timestamp=now)
        # Without an offset this will raise an expired exception.
        self.receive(sn.request_header, timestamp_skew_in_seconds=120)

    @raises(MacMismatch)
    def test_hash_tampering(self):
        sn = self.Sender()
        header = sn.request_header.replace('hash="''hash="nope')
        self.receive(header)

    @raises(MacMismatch)
    def test_bad_secret(self):
        cfg = {
            'id''my-hawk-id',
            'key''INCORRECT; YOU FAIL',
            'algorithm''sha256',
        }
        sn = self.Sender(credentials=cfg)
        self.receive(sn.request_header)

    @raises(MacMismatch)
    def test_unexpected_algorithm(self):
        cr = self.credentials.copy()
        cr['algorithm'] = 'sha512'
        sn = self.Sender(credentials=cr)

        # Validate with mismatched credentials (sha256).
        self.receive(sn.request_header)

    @raises(InvalidCredentials)
    def test_invalid_credentials(self):
        cfg = self.credentials.copy()
        # Create an invalid credentials.
        del cfg['algorithm']

        self.Sender(credentials=cfg)

    @raises(CredentialsLookupError)
    def test_unknown_id(self):
        cr = self.credentials.copy()
        cr['id'] = 'someone-else'
        sn = self.Sender(credentials=cr)

        self.receive(sn.request_header)

    @raises(MacMismatch)
    def test_bad_ext(self):
        sn = self.Sender(ext='my external data')

        header = sn.request_header.replace('my external data''TAMPERED')
        self.receive(header)

    @raises(BadHeaderValue)
    def test_duplicate_keys(self):
        sn = self.Sender(ext='someext')
        header = sn.request_header + ', ext="otherext"'
        self.receive(header)

    @raises(BadHeaderValue)
    def test_ext_with_quotes(self):
        sn = self.Sender(ext='quotes=""')
        self.receive(sn.request_header)

    @raises(BadHeaderValue)
    def test_ext_with_new_line(self):
        sn = self.Sender(ext="new line \n in the middle")
        self.receive(sn.request_header)

    def test_ext_with_equality_sign(self):
        sn = self.Sender(ext="foo=bar&foo2=bar2;foo3=bar3")
        self.receive(sn.request_header)
        parsed = parse_authorization_header(sn.request_header)
        eq_(parsed['ext'], "foo=bar&foo2=bar2;foo3=bar3")

    @raises(HawkFail)
    def test_non_hawk_scheme(self):
        parse_authorization_header('Basic user:base64pw')

    @raises(HawkFail)
    def test_invalid_key(self):
        parse_authorization_header('Hawk mac="validmac" unknownkey="value"')

    def test_ext_with_all_valid_characters(self):
        valid_characters = "!#$%&'()*+,-./:;<=>?@[]^_`{|}~ azAZ09_"
        sender = self.Sender(ext=valid_characters)
        parsed = parse_authorization_header(sender.request_header)
        eq_(parsed['ext'], valid_characters)

    @raises(BadHeaderValue)
    def test_ext_with_illegal_chars(self):
        self.Sender(ext="something like \t is illegal")

    def test_unparseable_header(self):
        try:
            parse_authorization_header('Hawk mac="somemac", unparseable')
        except BadHeaderValue as exc:
            error_msg = str(exc)
            self.assertTrue("Couldn't parse Hawk header" in error_msg)
            self.assertTrue("unparseable" in error_msg)
        else:
            self.fail('should raise')

    @raises(BadHeaderValue)
    def test_ext_with_illegal_unicode(self):
        self.Sender(ext=u'Ivan Kristi\u0107')

    @raises(BadHeaderValue)
    def test_too_long_header(self):
        sn = self.Sender(ext='a'*5000)
        self.receive(sn.request_header)

    @raises(BadHeaderValue)
    def test_ext_with_illegal_utf8(self):
        # This isn't allowed because the escaped byte chars are out of
        # range.
        self.Sender(ext=u'Ivan Kristi\u0107'.encode('utf8'))

    def test_app_ok(self):
        app = 'custom-app'
        sn = self.Sender(app=app)
        self.receive(sn.request_header)
        parsed = parse_authorization_header(sn.request_header)
        eq_(parsed['app'], app)

    @raises(MacMismatch)
    def test_tampered_app(self):
        app = 'custom-app'
        sn = self.Sender(app=app)
        header = sn.request_header.replace(app, 'TAMPERED-WITH')
        self.receive(header)

    def test_dlg_ok(self):
        dlg = 'custom-dlg'
        sn = self.Sender(dlg=dlg)
        self.receive(sn.request_header)
        parsed = parse_authorization_header(sn.request_header)
        eq_(parsed['dlg'], dlg)

    @raises(MacMismatch)
    def test_tampered_dlg(self):
        dlg = 'custom-dlg'
        sn = self.Sender(dlg=dlg, app='some-app')
        header = sn.request_header.replace(dlg, 'TAMPERED-WITH')
        self.receive(header)

    def test_file_content(self):
        method = "POST"
        content = six.BytesIO(b"FILE CONTENT")
        sn = self.Sender(method, content=content)
        self.receive(sn.request_header, method=method, content=content.getvalue())

    def test_binary_file_content(self):
        method = "POST"
        content = six.BytesIO(b"\x00\xffCONTENT\xff\x00")
        sn = self.Sender(method, content=content)
        self.receive(sn.request_header, method=method, content=content.getvalue())

    @raises(MisComputedContentHash)
    def test_bad_file_content(self):
        method = "POST"
        content = six.BytesIO(b"FILE CONTENT")
        sn = self.Sender(method, content=content)
        self.receive(sn.request_header, method=method, content="BAD FILE CONTENT")


class TestReceiver(Base):

    def setUp(self):
        super(TestReceiver, self).setUp()
        self.url = 'http://site.com/'
        self.sender = None
        self.receiver = None

    def receive(self, method='GET', **kw):
        url = kw.pop('url', self.url)
        sender = kw.pop('sender'None)
        sender_kw = kw.pop('sender_kw', {})
        sender_kw.setdefault('content''')
        sender_kw.setdefault('content_type''')
        sender_url = kw.pop('sender_url', url)

        credentials_map = kw.pop('credentials_map',
                                 lambda id: self.credentials)

        if sender:
            self.sender = sender
        else:
            self.sender = Sender(self.credentials, sender_url, method,
                                 **sender_kw)

        kw.setdefault('content''')
        kw.setdefault('content_type''')
        self.receiver = Receiver(credentials_map,
                                 self.sender.request_header, url, method,
                                 **kw)

    def respond(self, **kw):
        accept_kw = kw.pop('accept_kw', {})
        accept_kw.setdefault('content''')
        accept_kw.setdefault('content_type''')
        receiver = kw.pop('receiver', self.receiver)

        kw.setdefault('content''')
        kw.setdefault('content_type''')
        receiver.respond(**kw)
        self.sender.accept_response(receiver.response_header, **accept_kw)

        return receiver.response_header

    @raises(InvalidCredentials)
    def test_invalid_credentials_lookup(self):
        # Return invalid credentials.
        self.receive(credentials_map=lambda *a: {})

    def test_get_ok(self):
        method = 'GET'
        self.receive(method=method)
        self.respond()

    def test_post_ok(self):
        method = 'POST'
        self.receive(method=method)
        self.respond()

    @raises(MisComputedContentHash)
    def test_respond_with_wrong_content(self):
        self.receive()
        self.respond(content='real content',
                     accept_kw=dict(content='TAMPERED WITH'))

    @raises(MisComputedContentHash)
    def test_respond_with_wrong_content_type(self):
        self.receive()
        self.respond(content_type='text/html',
                     accept_kw=dict(content_type='application/json'))

    @raises(MissingAuthorization)
    def test_missing_authorization(self):
        Receiver(lambda id: self.credentials, None'/''GET')

    @raises(MacMismatch)
    def test_respond_with_wrong_url(self):
        self.receive(url='http://fakesite.com')
        wrong_receiver = self.receiver

        self.receive(url='http://realsite.com')

        self.respond(receiver=wrong_receiver)

    @raises(MacMismatch)
    def test_respond_with_wrong_method(self):
        self.receive(method='GET')
        wrong_receiver = self.receiver

        self.receive(method='POST')

        self.respond(receiver=wrong_receiver)

    @raises(MacMismatch)
    def test_respond_with_wrong_nonce(self):
        self.receive(sender_kw=dict(nonce='another-nonce'))
        wrong_receiver = self.receiver

        self.receive()

        # The nonce must match the one sent in the original request.
        self.respond(receiver=wrong_receiver)

    def test_respond_with_unhashed_content(self):
        self.receive()

        self.respond(always_hash_content=False, content=None,
                     content_type=None,
                     accept_kw=dict(accept_untrusted_content=True))

    @raises(TokenExpired)
    def test_respond_with_expired_ts(self):
        self.receive()
        hdr = self.receiver.respond(content='', content_type='')

        with mock.patch('mohawk.base.utc_now'as fn:
            fn.return_value = 0  # force an expiry
            try:
                self.sender.accept_response(hdr, content='', content_type='')
            except TokenExpired:
                etype, exc, tb = sys.exc_info()
                hdr = parse_authorization_header(exc.www_authenticate)
                calculated = calculate_ts_mac(fn(), self.credentials)
                if isinstance(calculated, six.binary_type):
                    calculated = calculated.decode('ascii')
                eq_(hdr['tsm'], calculated)
                raise

    def test_respond_with_bad_ts_skew_ok(self):
        now = utc_now() - 120

        self.receive()
        hdr = self.receiver.respond(content='', content_type='')

        with mock.patch('mohawk.base.utc_now'as fn:
            fn.return_value = now

            # Without an offset this will raise an expired exception.
            self.sender.accept_response(hdr, content='', content_type='',
                                        timestamp_skew_in_seconds=120)

    def test_respond_with_ext(self):
        self.receive()

        ext = 'custom-ext'
        self.respond(ext=ext)
        header = parse_authorization_header(self.receiver.response_header)
        eq_(header['ext'], ext)

    @raises(MacMismatch)
    def test_respond_with_wrong_app(self):
        self.receive(sender_kw=dict(app='TAMPERED-WITH', dlg='delegation'))
        self.receiver.respond(content='', content_type='')
        wrong_receiver = self.receiver

        self.receive(sender_kw=dict(app='real-app', dlg='delegation'))

        self.sender.accept_response(wrong_receiver.response_header,
                                    content='', content_type='')

    @raises(MacMismatch)
    def test_respond_with_wrong_dlg(self):
        self.receive(sender_kw=dict(app='app', dlg='TAMPERED-WITH'))
        self.receiver.respond(content='', content_type='')
        wrong_receiver = self.receiver

        self.receive(sender_kw=dict(app='app', dlg='real-dlg'))

        self.sender.accept_response(wrong_receiver.response_header,
                                    content='', content_type='')

    @raises(MacMismatch)
    def test_receive_wrong_method(self):
        self.receive(method='GET')
        wrong_sender = self.sender
        self.receive(method='POST', sender=wrong_sender)

    @raises(MacMismatch)
    def test_receive_wrong_url(self):
        self.receive(url='http://fakesite.com/')
        wrong_sender = self.sender
        self.receive(url='http://realsite.com/', sender=wrong_sender)

    @raises(MisComputedContentHash)
    def test_receive_wrong_content(self):
        self.receive(sender_kw=dict(content='real request'),
                     content='real request')
        wrong_sender = self.sender
        self.receive(content='TAMPERED WITH', sender=wrong_sender)

    def test_expected_unhashed_empty_content(self):
        # This test sets up a scenario where the receiver will receive empty
        # strings for content and content_type and no content hash in the auth
        # header.
        # This is to account for callers that might provide empty strings for
        # the payload when in fact there is literally no content. In this case,
        # mohawk depends on the presence of the content hash in the auth header
        # to determine how to treat the empty strings: no hash in the header
        # implies that no hashing is expected to occur on the server.
        self.receive(content='',
                     content_type='',
                     sender_kw=dict(content=EmptyValue,
                                    content_type=EmptyValue,
                                    always_hash_content=False))

    @raises(MisComputedContentHash)
    def test_expected_unhashed_empty_content_with_content_type(self):
        # This test sets up a scenario where the receiver will receive an
        # empty content string and no content hash in the auth header, but
        # some value for content_type.
        # This is to confirm that the hash is calculated and compared (to the
        # hash of mock empty payload, which should fail) when it appears that
        # the sender has sent a 0-length payload body.
        self.receive(content='',
                     content_type='text/plain',
                     sender_kw=dict(content=EmptyValue,
                                    content_type=EmptyValue,
                                    always_hash_content=False))

    @raises(MisComputedContentHash)
    def test_expected_unhashed_content_with_empty_content_type(self):
        # This test sets up a scenario where the receiver will receive some
        # content but the empty string for the content_type and no content hash
        # in the auth header.
        # This is to confirm that the hash is calculated and compared (to the
        # hash of mock empty payload, which should fail) when the sender has
        # sent unhashed content.
        self.receive(content='some content',
                     content_type='',
                     sender_kw=dict(content=EmptyValue,
                                    content_type=EmptyValue,
                                    always_hash_content=False))

    def test_empty_content_with_content_type(self):
        # This test sets up a scenario where the receiver will receive an
        # empty content string, some value for content_type and a content hash.
        # This is to confirm that the hash is calculated and compared correctly
        # when the sender has sent a hashed 0-length payload body.
        self.receive(content='',
                     content_type='text/plain',
                     sender_kw=dict(content='',
                                    content_type='text/plain'))

    def test_expected_unhashed_no_content(self):
        # This test sets up a scenario where the receiver will receive None for
        # content and content_type and no content hash in the auth header.
        # This is like test_expected_unhashed_empty_content(), but tests for
        # the less ambiguous case where the caller has explicitly passed in None
        # to indicate that there is no content to hash.
        self.receive(content=None,
                     content_type=None,
                     sender_kw=dict(content=EmptyValue,
                                    content_type=EmptyValue,
                                    always_hash_content=False))

    @raises(MisComputedContentHash)
    def test_expected_unhashed_no_content_with_content_type(self):
        # This test sets up a scenario where the receiver will receive None for
        # content and no content hash in the auth header, but some value for
        # content_type.
        # In this case, the content will be coerced to the empty string for
        # hashing purposes. The request should fail, as the there is no content
        # hash in the request to compare against. While this may not be in
        # accordance with the js reference spec, it's the safest (ie. most
        # secure) way of handling this bizarre set of circumstances.
        self.receive(content=None,
                     content_type='text/plain',
                     sender_kw=dict(content=EmptyValue,
                                    content_type=EmptyValue,
                                    always_hash_content=False))

    @raises(MisComputedContentHash)
    def test_expected_unhashed_content_with_no_content_type(self):
        # This test sets up a scenario where the receiver will receive some
        # content but no value for the content_type and no content hash in
        # the auth header.
        # This is to confirm that the hash is calculated and compared (to the
        # hash of mock empty payload, which should fail) when the sender has
        # sent unhashed content.
        self.receive(content='some content',
                     content_type=None,
                     sender_kw=dict(content=EmptyValue,
                                    content_type=EmptyValue,
                                    always_hash_content=False))

    def test_no_content_with_content_type(self):
        # This test sets up a scenario where the receiver will receive None for
        # the content string, some value for content_type and a content hash.
        # This is to confirm that coercing None to the empty string when a hash
        # is expected allows the hash to be calculated and compared correctly
        # as if the sender has sent a hashed 0-length payload body.
        self.receive(content=None,
                     content_type='text/plain',
                     sender_kw=dict(content='',
                                    content_type='text/plain'))

    @raises(MissingContent)
    def test_cannot_receive_empty_content_only(self):
        content_type = 'text/plain'
        self.receive(sender_kw=dict(content='',
                                    content_type=content_type),
                     content=EmptyValue, content_type=content_type)

    @raises(MissingContent)
    def test_cannot_receive_empty_content_type_only(self):
        content = ''
        self.receive(sender_kw=dict(content=content,
                                    content_type='text/plain'),
                     content=content, content_type=EmptyValue)

    @raises(MisComputedContentHash)
    def test_receive_wrong_content_type(self):
        self.receive(sender_kw=dict(content_type='text/html'),
                     content_type='text/html')
        wrong_sender = self.sender

        self.receive(content_type='application/json',
                     sender=wrong_sender)


class TestSendAndReceive(Base):

    def test(self):
        credentials = {
            'id''some-id',
            'key''some secret',
            'algorithm''sha256'
        }

        url = 'https://my-site.com/'
        method = 'POST'

        # The client sends a request with a Hawk header.
        content = 'foo=bar&baz=nooz'
        content_type = 'application/x-www-form-urlencoded'

        sender = Sender(credentials,
                        url, method,
                        content=content,
                        content_type=content_type)

        # The server receives a request and authorizes access.
        receiver = Receiver(lambda id: credentials,
                            sender.request_header,
                            url, method,
                            content=content,
                            content_type=content_type)

        # The server responds with a similar Hawk header.
        content = 'we are friends'
        content_type = 'text/plain'
        receiver.respond(content=content,
                         content_type=content_type)

        # The client receives a response and authorizes access.
        sender.accept_response(receiver.response_header,
                               content=content,
                               content_type=content_type)


class TestBewit(Base):

    # Test cases copied from
    https://github.com/hueniverse/hawk/blob/492632da51ecedd5f59ce96f081860ad24ce6532/test/uri.js

    def setUp(self):
        self.credentials = {
            'id''123456',
            'key''2983d45yun89q',
            'algorithm''sha256',
        }

    def make_credential_lookup(self, credentials_map):
        # Helper function to make a lookup function given a dictionary of
        # credentials
        def lookup(client_id):
            # Will raise a KeyError if missing; which is a subclass of
            # LookupError
            return credentials_map[client_id]
        return lookup

    def test_bewit(self):
        res = Resource(url='https://example.com/somewhere/over/the/rainbow',
                       method='GET', credentials=self.credentials,
                       timestamp=1356420407 + 300,
                       nonce='',
                       )
        bewit = get_bewit(res)

        expected = '123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
        eq_(b64decode(bewit).decode('ascii'), expected)

    def test_bewit_with_binary_id(self):
        # Check for exceptions in get_bewit call with binary id
        binary_credentials = self.credentials.copy()
        binary_credentials['id'] = binary_credentials['id'].encode('ascii')
        res = Resource(url='https://example.com/somewhere/over/the/rainbow',
                       method='GET', credentials=binary_credentials,
                       timestamp=1356420407 + 300,
                       nonce='',
                       )
        get_bewit(res)

    def test_bewit_with_ext(self):
        res = Resource(url='https://example.com/somewhere/over/the/rainbow',
                       method='GET', credentials=self.credentials,
                       timestamp=1356420407 + 300,
                       nonce='',
                       ext='xandyandz'
                       )
        bewit = get_bewit(res)

        expected = '123456\\1356420707\\kscxwNR2tJpP1T1zDLNPbB5UiKIU9tOSJXTUdG7X9h8=\\xandyandz'
        eq_(b64decode(bewit).decode('ascii'), expected)

    @raises(BadHeaderValue)
    def test_bewit_with_invalid_ext(self):
        res = Resource(url='https://example.com/somewhere/over/the/rainbow',
                       method='GET', credentials=self.credentials,
                       timestamp=1356420407 + 300,
                       nonce='',
                       ext='xand\\yandz')
        get_bewit(res)

    @raises(BadHeaderValue)
    def test_bewit_with_backslashes_in_id(self):
        credentials = self.credentials
        credentials['id'] = '123\\456'
        res = Resource(url='https://example.com/somewhere/over/the/rainbow',
                       method='GET', credentials=self.credentials,
                       timestamp=1356420407 + 300,
                       nonce='')
        get_bewit(res)

    def test_bewit_with_port(self):
        res = Resource(url='https://example.com:8080/somewhere/over/the/rainbow',
                       method='GET', credentials=self.credentials,
                       timestamp=1356420407 + 300, nonce='', ext='xandyandz')
        bewit = get_bewit(res)

        expected = '123456\\1356420707\\hZbJ3P2cKEo4ky0C8jkZAkRyCZueg4WSNbxV7vq3xHU=\\xandyandz'
        eq_(b64decode(bewit).decode('ascii'), expected)

    @raises(ValueError)
    def test_bewit_with_nonce(self):
        res = Resource(url='https://example.com/somewhere/over/the/rainbow',
                       method='GET', credentials=self.credentials,
                       timestamp=1356420407 + 300,
                       nonce='n1')
        get_bewit(res)

    @raises(ValueError)
    def test_bewit_invalid_method(self):
        res = Resource(url='https://example.com:8080/somewhere/over/the/rainbow',
                       method='POST', credentials=self.credentials,
                       timestamp=1356420407 + 300, nonce='')
        get_bewit(res)

    def test_strip_bewit(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit}".format(bewit=bewit)

        raw_bewit, stripped_url = strip_bewit(url)
        self.assertEqual(raw_bewit, bewit)
        self.assertEqual(stripped_url, "https://example.com/somewhere/over/the/rainbow")

    @raises(InvalidBewit)
    def test_strip_url_without_bewit(self):
        url = "https://example.com/somewhere/over/the/rainbow"
        strip_bewit(url)

    def test_parse_bewit(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        bewit = parse_bewit(bewit)
        self.assertEqual(bewit.id, '123456')
        self.assertEqual(bewit.expiration, '1356420707')
        self.assertEqual(bewit.mac, 'IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=')
        self.assertEqual(bewit.ext, '')

    def test_parse_bewit_with_ext(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\xandyandz'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        bewit = parse_bewit(bewit)
        self.assertEqual(bewit.id, '123456')
        self.assertEqual(bewit.expiration, '1356420707')
        self.assertEqual(bewit.mac, 'IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=')
        self.assertEqual(bewit.ext, 'xandyandz')

    @raises(InvalidBewit)
    def test_parse_bewit_with_ext_and_backslashes(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\xand\\yandz'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        parse_bewit(bewit)

    @raises(InvalidBewit)
    def test_parse_invalid_bewit_with_only_one_part(self):
        bewit = b'12345'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        bewit = parse_bewit(bewit)

    @raises(InvalidBewit)
    def test_parse_invalid_bewit_with_only_two_parts(self):
        bewit = b'1\\2'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        bewit = parse_bewit(bewit)

    def test_validate_bewit(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit}".format(bewit=bewit)
        credential_lookup = self.make_credential_lookup({
            self.credentials['id']: self.credentials,
        })
        self.assertTrue(check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10))

    def test_validate_bewit_with_ext(self):
        bewit = b'123456\\1356420707\\kscxwNR2tJpP1T1zDLNPbB5UiKIU9tOSJXTUdG7X9h8=\\xandyandz'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit}".format(bewit=bewit)
        credential_lookup = self.make_credential_lookup({
            self.credentials['id']: self.credentials,
        })
        self.assertTrue(check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10))

    @raises(InvalidBewit)
    def test_validate_bewit_with_ext_and_backslashes(self):
        bewit = b'123456\\1356420707\\b82LLIxG5UDkaChLU953mC+SMrbniV1sb8KiZi9cSsc=\\xand\\yandz'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit}".format(bewit=bewit)
        credential_lookup = self.make_credential_lookup({
            self.credentials['id']: self.credentials,
        })
        check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10)

    @raises(TokenExpired)
    def test_validate_expired_bewit(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit}".format(bewit=bewit)
        credential_lookup = self.make_credential_lookup({
            self.credentials['id']: self.credentials,
        })
        check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 1000)

    @raises(CredentialsLookupError)
    def test_validate_bewit_with_unknown_credentials(self):
        bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
        bewit = urlsafe_b64encode(bewit).decode('ascii')
        url = "https://example.com/somewhere/over/the/rainbow?bewit={bewit}".format(bewit=bewit)
        credential_lookup = self.make_credential_lookup({
            'other_id': self.credentials,
        })
        check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10)


class TestPayloadHash(Base):
    def test_hash_file_read_blocks(self):
        payload = six.BytesIO(b"\x00\xffhello world\xff\x00")
        h1 = calculate_payload_hash(payload, 'sha256''application/json', block_size=1)
        payload.seek(0)
        h2 = calculate_payload_hash(payload, 'sha256''application/json', block_size=1024)
        self.assertEqual(h1, h2)

100%


¤ Dauer der Verarbeitung: 0.5 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge