summaryrefslogtreecommitdiff
diff options
authorroland <roland@catalogix.se>2016-12-19 15:01:39 +0100
committerroland <roland@catalogix.se>2016-12-19 15:01:39 +0100
commit6dd2278c81d0f3fea5512d6e8639afc89f6a5ca4 (patch)
tree2f24446e120a5f216d9686b35b3e526ba6c68889
parent0d5394ae172412cc27de4c4e14f1877d660d1b16 (diff)
Making federated client request and handling such a request now works.v0.9.4
-rw-r--r--src/oic/extension/fed_client.py34
-rw-r--r--src/oic/extension/fed_provider.py5
-rw-r--r--tests/test_oic_fed_client.py110
-rw-r--r--tests/test_oic_fed_entities.py207
4 files changed, 230 insertions, 126 deletions
diff --git a/src/oic/extension/fed_client.py b/src/oic/extension/fed_client.py
index ad1b2b52..225391e9 100644
--- a/src/oic/extension/fed_client.py
+++ b/src/oic/extension/fed_client.py
@@ -79,6 +79,25 @@ class Client(oic.Client, FederationEntity):
return resp
+ def federated_client_registration_request(self, **kwargs):
+ req = ClientMetadataStatement()
+
+ try:
+ pp = kwargs['fo_pattern']
+ except KeyError:
+ pp = '.'
+ req['metadata_statements'] = self.pick_signed_metadata_statements(pp)
+
+ try:
+ req['redirect_uris'] = kwargs['redirect_uris']
+ except KeyError:
+ try:
+ req["redirect_uris"] = self.redirect_uris
+ except AttributeError:
+ raise MissingRequiredAttribute("redirect_uris", kwargs)
+
+ return req
+
def register(self, url, **kwargs):
try:
reg_type = kwargs['registration_type']
@@ -86,20 +105,7 @@ class Client(oic.Client, FederationEntity):
reg_type = 'core'
if reg_type == 'federation':
- req = ClientMetadataStatement()
-
- try:
- pp = kwargs['fo_pattern']
- except KeyError:
- pp = '.'
- req['metadata_statements'] = self.pick_signed_metadata_statements(pp)
-
- if "redirect_uris" not in kwargs:
- try:
- req["redirect_uris"] = self.redirect_uris
- except AttributeError:
- raise MissingRequiredAttribute("redirect_uris", kwargs)
-
+ req = self.federated_client_registration_request(**kwargs)
else:
req = self.create_registration_request(**kwargs)
diff --git a/src/oic/extension/fed_provider.py b/src/oic/extension/fed_provider.py
index 254ff339..7e916cfb 100644
--- a/src/oic/extension/fed_provider.py
+++ b/src/oic/extension/fed_provider.py
@@ -3,7 +3,7 @@ import logging
from oic.oic import provider
from oic.extension.oidc_fed import ClientMetadataStatement
from oic.extension.oidc_fed import FederationEntity
-from oic.oic.message import OpenIDSchema
+from oic.oic.message import OpenIDSchema, RegistrationRequest
from oic.utils.http_util import Created
from oic.utils.http_util import Response
from oic.utils.sanitize import sanitize
@@ -46,7 +46,8 @@ class Provider(provider.Provider, FederationEntity):
logger.info("registration_request:%s" % sanitize(request.to_dict()))
- request = self.get_metadata_statement(request)
+ request_args = self.get_metadata_statement(request)
+ request = RegistrationRequest(**request_args)
result = self.client_registration_setup(request)
if isinstance(result, Response):
diff --git a/tests/test_oic_fed_client.py b/tests/test_oic_fed_client.py
deleted file mode 100644
index ce0387bc..00000000
--- a/tests/test_oic_fed_client.py
+++ /dev/null
@@ -1,110 +0,0 @@
-import os
-
-import pytest
-from jwkest.jwk import rsa_load
-from oic.extension.oidc_fed import Operator, ClientMetadataStatement
-
-from oic.oic import DEF_SIGN_ALG
-
-from oic.utils.keyio import KeyBundle, build_keyjar, KeyJar
-
-from oic.extension.fed_client import Client
-from oic.utils.authn.client import CLIENT_AUTHN_METHOD
-
-BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "data/keys"))
-_key = rsa_load(os.path.join(BASE_PATH, "rsa.key"))
-KC_RSA = KeyBundle({"key": _key, "kty": "RSA", "use": "sig"})
-
-CLIENT_ID = "client_1"
-
-KEYDEFS = [
- {"type": "RSA", "key": '', "use": ["sig"]},
- {"type": "EC", "crv": "P-256", "use": ["sig"]}
-]
-
-KEYS = {}
-ISSUER = {}
-OPERATOR = {}
-
-for entity in ['fo', 'fo1', 'org', 'inter', 'admin', 'ligo', 'op']:
- fname = os.path.join(BASE_PATH, "{}.key".format(entity))
- _keydef = KEYDEFS[:]
- _keydef[0]['key'] = fname
-
- _jwks, _keyjar, _kidd = build_keyjar(_keydef)
- KEYS[entity] = {'jwks': _jwks, 'keyjar': _keyjar, 'kidd': _kidd}
- ISSUER[entity] = 'https://{}.example.org'.format(entity)
- OPERATOR[entity] = Operator(keyjar=_keyjar, iss=ISSUER[entity], jwks=_jwks)
-
-FOP = OPERATOR['fo']
-FOP.fo_keyjar = FOP.keyjar
-FO1P = OPERATOR['fo1']
-FO1P.fo_keyjar = FO1P.keyjar
-ORGOP = OPERATOR['org']
-ADMINOP = OPERATOR['admin']
-INTEROP = OPERATOR['inter']
-LIGOOP = OPERATOR['ligo']
-OPOP = OPERATOR['op']
-
-
-def fo_member(*args):
- _kj = KeyJar()
- for fo in args:
- _kj.import_jwks(fo.jwks, fo.iss)
-
- return Operator(fo_keyjar=_kj)
-
-
-def create_compound_metadata_statement(spec):
- _ms = None
- for op, op_args, signer, sig_args in spec:
- _cms = ClientMetadataStatement(signing_keys=op.jwks, **op_args)
- if _ms:
- sig_args['metadata_statements'] = [_ms]
- _ms = signer.pack_metadata_statement(_cms, **sig_args)
- return _ms
-
-SPEC = [
- [ORGOP, {'contacts':['info@example.com']},
- FOP, {'alg':'RS256', 'scope':['openid']}],
- [INTEROP, {'tos_uri':['https://inter.example.com/tos.html']},
- ORGOP, {'alg':'RS256'}],
- [ADMINOP, {'redirect_uris':['https://rp.example.com/auth_cb']},
- INTEROP, {'alg':'RS256'}]
-]
-
-
-class TestClient(object):
- @pytest.fixture(autouse=True)
- def create_client(self):
- sms = [create_compound_metadata_statement(SPEC)]
- self.redirect_uri = "http://example.com/redirect"
- self.client = Client(CLIENT_ID,
- client_authn_method=CLIENT_AUTHN_METHOD,
- fo_keyjar=fo_member(FOP, FO1P).fo_keyjar,
- signed_metadata_statements=sms,
- fo_priority_order=[FOP.iss, FO1P.iss]
- )
- self.client.redirect_uris = [self.redirect_uri]
- self.client.authorization_endpoint = \
- "http://example.com/authorization"
- self.client.token_endpoint = "http://example.com/token"
- self.client.userinfo_endpoint = "http://example.com/userinfo"
- self.client.client_secret = "abcdefghijklmnop"
- self.client.keyjar[""] = KC_RSA
- self.client.behaviour = {
- "request_object_signing_alg": DEF_SIGN_ALG[
- "openid_request_object"]}
-
- def test_init(self):
- receiver = fo_member(FOP, FO1P)
- ms = receiver.unpack_metadata_statement(
- jwt_ms=self.client.signed_metadata_statements[0])
- res = receiver.evaluate_metadata_statement(ms)
- assert FOP.iss in res
-
- def test_create_registration_request(self):
- self.client.create_registration_request(
- redirect_uris=['https://rp.example.com/auth_cb'],
- metadata_statements=[self.client.signed_metadata_statements]
- ) \ No newline at end of file
diff --git a/tests/test_oic_fed_entities.py b/tests/test_oic_fed_entities.py
new file mode 100644
index 00000000..fb140ad7
--- /dev/null
+++ b/tests/test_oic_fed_entities.py
@@ -0,0 +1,207 @@
+import json
+import os
+from time import time
+
+import pytest
+from jwkest.jwk import rsa_load
+from oic.oic.message import RegistrationResponse
+
+from oic import rndstr
+from oic.oic import DEF_SIGN_ALG
+from oic.extension.fed_client import Client
+from oic.extension.fed_provider import Provider
+from oic.extension.oidc_fed import ClientMetadataStatement
+from oic.extension.oidc_fed import Operator
+from oic.utils.authn.authn_context import AuthnBroker
+from oic.utils.authn.client import CLIENT_AUTHN_METHOD
+from oic.utils.authn.client import verify_client
+from oic.utils.authn.user import UserAuthnMethod
+from oic.utils.authz import AuthzHandling
+from oic.utils.keyio import build_keyjar
+from oic.utils.keyio import KeyBundle
+from oic.utils.keyio import KeyJar
+from oic.utils.sdb import SessionDB
+from oic.utils.userinfo import UserInfo
+
+BASE_PATH = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), "data/keys"))
+_key = rsa_load(os.path.join(BASE_PATH, "rsa.key"))
+KC_RSA = KeyBundle({"key": _key, "kty": "RSA", "use": "sig"})
+
+CLIENT_ID = "client_1"
+
+KEYDEFS = [
+ {"type": "RSA", "key": '', "use": ["sig"]},
+ {"type": "EC", "crv": "P-256", "use": ["sig"]}
+]
+
+CONSUMER_CONFIG = {
+ "authz_page": "/authz",
+ "scope": ["openid"],
+ "response_type": ["code"],
+ "user_info": {
+ "name": None,
+ "email": None,
+ "nickname": None
+ },
+ "request_method": "param"
+}
+
+SERVER_INFO = {
+ "version": "3.0",
+ "issuer": "https://connect-op.heroku.com",
+ "authorization_endpoint": "http://localhost:8088/authorization",
+ "token_endpoint": "http://localhost:8088/token",
+ "flows_supported": ["code", "token", "code token"],
+}
+
+BASE_PATH = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), "data/keys"))
+
+KEYJAR = KeyJar()
+KEYJAR[""] = KC_RSA
+
+CDB = {}
+
+USERDB = {
+ "username": {
+ "name": "Linda Lindgren",
+ "nickname": "Linda",
+ "email": "linda@example.com",
+ "verified": True,
+ "sub": "username"
+ }
+}
+
+URLMAP = {CLIENT_ID: ["https://example.com/authz"]}
+
+
+def _eq(l1, l2):
+ return set(l1) == set(l2)
+
+
+class DummyAuthn(UserAuthnMethod):
+ def __init__(self, srv, user):
+ UserAuthnMethod.__init__(self, srv)
+ self.user = user
+
+ def authenticated_as(self, cookie=None, **kwargs):
+ if cookie == "FAIL":
+ return None, 0
+ else:
+ return {"uid": self.user}, time()
+
+
+# AUTHN = UsernamePasswordMako(None, "login.mako", tl, PASSWD, "authenticated")
+AUTHN_BROKER = AuthnBroker()
+AUTHN_BROKER.add("UNDEFINED", DummyAuthn(None, "username"))
+
+# dealing with authorization
+AUTHZ = AuthzHandling()
+SYMKEY = rndstr(16) # symmetric key used to encrypt cookie info
+USERINFO = UserInfo(USERDB)
+
+KEYS = {}
+ISSUER = {}
+OPERATOR = {}
+
+for entity in ['fo', 'fo1', 'org', 'inter', 'admin', 'ligo', 'op']:
+ fname = os.path.join(BASE_PATH, "{}.key".format(entity))
+ _keydef = KEYDEFS[:]
+ _keydef[0]['key'] = fname
+
+ _jwks, _keyjar, _kidd = build_keyjar(_keydef)
+ KEYS[entity] = {'jwks': _jwks, 'keyjar': _keyjar, 'kidd': _kidd}
+ ISSUER[entity] = 'https://{}.example.org'.format(entity)
+ OPERATOR[entity] = Operator(keyjar=_keyjar, iss=ISSUER[entity], jwks=_jwks)
+
+FOP = OPERATOR['fo']
+FOP.fo_keyjar = FOP.keyjar
+FO1P = OPERATOR['fo1']
+FO1P.fo_keyjar = FO1P.keyjar
+ORGOP = OPERATOR['org']
+ADMINOP = OPERATOR['admin']
+INTEROP = OPERATOR['inter']
+LIGOOP = OPERATOR['ligo']
+OPOP = OPERATOR['op']
+
+
+def fo_keyjar(*args):
+ _kj = KeyJar()
+ for fo in args:
+ _kj.import_jwks(fo.jwks, fo.iss)
+ return _kj
+
+def fo_member(*args):
+ return Operator(fo_keyjar=fo_keyjar(*args))
+
+
+def create_compound_metadata_statement(spec):
+ _ms = None
+ root_signer = ''
+ for op, op_args, signer, sig_args in spec:
+ _cms = ClientMetadataStatement(signing_keys=op.jwks, **op_args)
+ if _ms:
+ sig_args['metadata_statements'] = [_ms]
+ else: # root signed
+ root_signer = signer.iss
+ _ms = signer.pack_metadata_statement(_cms, **sig_args)
+ return root_signer, _ms
+
+
+SPEC = [
+ [ORGOP, {'contacts': ['info@example.com']},
+ FOP, {'alg': 'RS256', 'scope': ['openid']}],
+ [INTEROP, {'tos_uri': ['https://rp.example.com/tos.html']},
+ ORGOP, {'alg': 'RS256'}],
+ [ADMINOP, {'redirect_uris': ['https://rp.example.com/auth_cb']},
+ INTEROP, {'alg': 'RS256'}]
+]
+
+
+class TestClient(object):
+ @pytest.fixture(autouse=True)
+ def create_client(self):
+ signer, ms = create_compound_metadata_statement(SPEC)
+ sms = {signer: [ms]}
+
+ self.redirect_uri = "http://example.com/redirect"
+ self.client = Client(CLIENT_ID,
+ client_authn_method=CLIENT_AUTHN_METHOD,
+ fo_keyjar=fo_member(FOP, FO1P).fo_keyjar,
+ signed_metadata_statements=sms,
+ fo_priority_order=[FOP.iss, FO1P.iss]
+ )
+ self.client.redirect_uris = [self.redirect_uri]
+ self.client.authorization_endpoint = \
+ "http://example.com/authorization"
+ self.client.token_endpoint = "http://example.com/token"
+ self.client.userinfo_endpoint = "http://example.com/userinfo"
+ self.client.client_secret = "abcdefghijklmnop"
+ self.client.keyjar[""] = KC_RSA
+ self.client.behaviour = {
+ "request_object_signing_alg": DEF_SIGN_ALG[
+ "openid_request_object"]}
+
+ self.provider = Provider(
+ SERVER_INFO["issuer"], SessionDB(SERVER_INFO["issuer"]), CDB,
+ AUTHN_BROKER, USERINFO, AUTHZ, verify_client, SYMKEY, urlmap=URLMAP,
+ keyjar=KEYJAR, fo_keyjar=fo_keyjar(FOP, FO1P),
+ fo_priority_order=[FOP.iss, FO1P.iss])
+ self.provider.baseurl = self.provider.name
+
+ def test_init(self):
+ receiver = fo_member(FOP, FO1P)
+ ms = receiver.unpack_metadata_statement(
+ jwt_ms=self.client.signed_metadata_statements[FOP.iss][0])
+ res = receiver.evaluate_metadata_statement(ms)
+ assert FOP.iss in res
+
+ def test_create_registration_request(self):
+ req = self.client.federated_client_registration_request(
+ redirect_uris=['https://rp.example.com/auth_cb']
+ )
+ msg = self.provider.registration_endpoint(req.to_json())
+ assert msg.status == '201 Created'
+ reqresp = RegistrationResponse(**json.loads(msg.message))
+ assert reqresp['response_types'] == ['code'] \ No newline at end of file