summaryrefslogtreecommitdiff
path: root/ick2/client_tests.py
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2018-04-06 21:48:07 +0300
committerLars Wirzenius <liw@liw.fi>2018-04-07 16:53:19 +0300
commit965f8816c8637bd7441bafd3f2a606664a74e56c (patch)
tree7304a230ef23105a2f3b9932a130df517fb02eae /ick2/client_tests.py
parent2d0d514d1f86e58d965160c28a4954da33b10baf (diff)
downloadick2-965f8816c8637bd7441bafd3f2a606664a74e56c.tar.gz
Add: AuthClient
Diffstat (limited to 'ick2/client_tests.py')
-rw-r--r--ick2/client_tests.py98
1 files changed, 96 insertions, 2 deletions
diff --git a/ick2/client_tests.py b/ick2/client_tests.py
index 92da36a..f164e6a 100644
--- a/ick2/client_tests.py
+++ b/ick2/client_tests.py
@@ -97,6 +97,30 @@ class HttpAPITests(unittest.TestCase):
obj = self.client.put('http://controller/work', body=blob)
self.assertEqual(obj, None)
+ def test_post_auth_does_basic_auth(self):
+ token = 'this is a token'
+ scope = 'this and that'
+ response = {
+ 'access_token': token,
+ 'token_type': 'bearer',
+ 'scope': scope,
+ }
+ self.session.response = FakeResponse(200, body=response)
+
+ client_id = 'this-is--my-client'
+ client_secret = '*****'
+ auth = (client_id, client_secret)
+ body = 'foo=bar'
+ self.client.post_auth(
+ 'http://auth.example.com/token', auth=auth, body=body)
+
+ self.assertEqual(self.session.auth, auth)
+
+ authz = self.session.headers['Authorization']
+ self.assertTrue(authz.startswith('Basic '))
+
+ self.assertEqual(self.session.body, body)
+
class ControllerClientTests(unittest.TestCase):
@@ -164,6 +188,70 @@ class ControllerClientTests(unittest.TestCase):
200, body=json.dumps(version), content_type=json_type)
self.assertEqual(self.controller.get_artifact_store_url(), url)
+ def test_get_auth_url_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.controller.get_auth_url()
+
+ def test_get_auth_url_succeeds(self):
+ url = 'https://blobs'
+ version = {
+ 'auth_url': url,
+ }
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(version), content_type=json_type)
+ self.assertEqual(self.controller.get_auth_url(), url)
+
+ def test_get_auth_client_returns_object(self):
+ url = 'https://blobs'
+ version = {
+ 'auth_url': url,
+ }
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(version), content_type=json_type)
+ ac = self.controller.get_auth_client()
+ self.assertTrue(isinstance(ac, ick2.AuthClient))
+
+
+class AuthClientTests(unittest.TestCase):
+
+ def setUp(self):
+ self.session = FakeHttpSession()
+
+ self.client = ick2.HttpAPI()
+ self.client.set_session(self.session)
+
+ def test_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+
+ url = 'https://auth.example.com'
+ client_id = 'test-client'
+ client_secret = 'hunter2'
+ ac = ick2.AuthClient()
+ ac.set_auth_url(url)
+ ac.set_http_api(self.client)
+ ac.set_client_creds(client_id, client_secret)
+ with self.assertRaises(ick2.HttpError):
+ ac.get_token('')
+
+ def test_returns_token(self):
+ token = 'this-is-my-token'
+ token_response = {
+ 'access_token': token,
+ }
+
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(token_response), content_type=json_type)
+
+ url = 'https://auth.example.com'
+ client_id = 'test-client'
+ client_secret = 'hunter2'
+ ac = ick2.AuthClient()
+ ac.set_auth_url(url)
+ ac.set_http_api(self.client)
+ ac.set_client_creds(client_id, client_secret)
+ self.assertEqual(ac.get_token(''), token)
+
class BlobServiceClientTests(unittest.TestCase):
@@ -231,15 +319,21 @@ class FakeHttpSession:
def __init__(self):
self.response = None
self.token = None
+ self.auth = None
+ self.headers = None
+ self.body = None
def get(self, url, headers=None, verify=None):
assert self.response is not None
assert self.is_authorized(headers)
return self.response
- def post(self, url, headers=None, data=None, verify=None):
+ def post(self, url, headers=None, data=None, verify=None, auth=None):
assert self.response is not None
- assert self.is_authorized(headers)
+ assert auth is not None or self.is_authorized(headers)
+ self.auth = auth
+ self.headers = headers
+ self.body = data
return self.response
def put(self, url, headers=None, data=None, verify=None):