|
|
|
@ -24,14 +24,14 @@ from flask import (
|
|
|
|
|
session,
|
|
|
|
|
url_for,
|
|
|
|
|
)
|
|
|
|
|
from flask.ext.login import (
|
|
|
|
|
LoginManager,
|
|
|
|
|
UserMixin,
|
|
|
|
|
current_user,
|
|
|
|
|
login_required,
|
|
|
|
|
login_user,
|
|
|
|
|
logout_user,
|
|
|
|
|
)
|
|
|
|
|
# from flask.ext.login import (
|
|
|
|
|
# LoginManager,
|
|
|
|
|
# UserMixin,
|
|
|
|
|
# current_user,
|
|
|
|
|
# login_required,
|
|
|
|
|
# login_user,
|
|
|
|
|
# logout_user,
|
|
|
|
|
# )
|
|
|
|
|
from flask_bootstrap import Bootstrap
|
|
|
|
|
from saml2 import (
|
|
|
|
|
BINDING_HTTP_POST,
|
|
|
|
@ -52,6 +52,7 @@ import requests
|
|
|
|
|
metadata_url_for = {
|
|
|
|
|
# For testing with http://saml.oktadev.com use the following:
|
|
|
|
|
# 'test': 'http://idp.oktadev.com/metadata',
|
|
|
|
|
'keycloak': 'https://auth.labcode.de/auth/realms/test/protocol/saml/descriptor'
|
|
|
|
|
# WARNING WARNING WARNING
|
|
|
|
|
# You MUST remove the testing IdP from a production system,
|
|
|
|
|
# as the testing IdP will allow ANYBODY to log in as ANY USER!
|
|
|
|
@ -61,8 +62,8 @@ metadata_url_for = {
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
Bootstrap(app)
|
|
|
|
|
app.secret_key = str(uuid.uuid4()) # Replace with your secret key
|
|
|
|
|
login_manager = LoginManager()
|
|
|
|
|
login_manager.setup_app(app)
|
|
|
|
|
# login_manager = LoginManager()
|
|
|
|
|
# login_manager.setup_app(app)
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
|
# NOTE:
|
|
|
|
|
# This is implemented as a dictionary for DEMONSTRATION PURPOSES ONLY.
|
|
|
|
@ -94,6 +95,7 @@ def saml_client_for(idp_name=None):
|
|
|
|
|
rv = requests.get(metadata_url_for[idp_name])
|
|
|
|
|
|
|
|
|
|
settings = {
|
|
|
|
|
'entityid': 'pysaml',
|
|
|
|
|
'metadata': {
|
|
|
|
|
'inline': [rv.text],
|
|
|
|
|
},
|
|
|
|
@ -126,7 +128,7 @@ def saml_client_for(idp_name=None):
|
|
|
|
|
return saml_client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class User(UserMixin):
|
|
|
|
|
class User():
|
|
|
|
|
def __init__(self, user_id):
|
|
|
|
|
user = {}
|
|
|
|
|
self.id = None
|
|
|
|
@ -140,10 +142,10 @@ class User(UserMixin):
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@login_manager.user_loader
|
|
|
|
|
def load_user(user_id):
|
|
|
|
|
return User(user_id)
|
|
|
|
|
#
|
|
|
|
|
# @login_manager.user_loader
|
|
|
|
|
# def load_user(user_id):
|
|
|
|
|
# return User(user_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/")
|
|
|
|
@ -160,18 +162,25 @@ def idp_initiated(idp_name):
|
|
|
|
|
authn_response.get_identity()
|
|
|
|
|
user_info = authn_response.get_subject()
|
|
|
|
|
username = user_info.text
|
|
|
|
|
print('#'*30)
|
|
|
|
|
print('uinfou', user_info)
|
|
|
|
|
print('username', username)
|
|
|
|
|
print('#'*30)
|
|
|
|
|
print('authn',authn_response)
|
|
|
|
|
|
|
|
|
|
# This is what as known as "Just In Time (JIT) provisioning".
|
|
|
|
|
# What that means is that, if a user in a SAML assertion
|
|
|
|
|
# isn't in the user store, we create that user first, then log them in
|
|
|
|
|
if username not in user_store:
|
|
|
|
|
print('#'*30)
|
|
|
|
|
print('AVA',authn_response.ava)
|
|
|
|
|
user_store[username] = {
|
|
|
|
|
'first_name': authn_response.ava['FirstName'][0],
|
|
|
|
|
'last_name': authn_response.ava['LastName'][0],
|
|
|
|
|
'first_name': authn_response.ava.get('FirstName',[''])[0],
|
|
|
|
|
'last_name': authn_response.ava.get('LastName',[''])[0],
|
|
|
|
|
}
|
|
|
|
|
user = User(username)
|
|
|
|
|
session['saml_attributes'] = authn_response.ava
|
|
|
|
|
login_user(user)
|
|
|
|
|
# login_user(user)
|
|
|
|
|
url = url_for('user')
|
|
|
|
|
# NOTE:
|
|
|
|
|
# On a production system, the RelayState MUST be checked
|
|
|
|
@ -205,9 +214,10 @@ def sp_initiated(idp_name):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/user")
|
|
|
|
|
@login_required
|
|
|
|
|
# @login_required
|
|
|
|
|
def user():
|
|
|
|
|
return render_template('user.html', session=session)
|
|
|
|
|
pass
|
|
|
|
|
# return render_template('main_page.html', session=session)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.errorhandler(401)
|
|
|
|
@ -216,7 +226,7 @@ def error_unauthorized(error):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/logout")
|
|
|
|
|
@login_required
|
|
|
|
|
# @login_required
|
|
|
|
|
def logout():
|
|
|
|
|
logout_user()
|
|
|
|
|
return redirect(url_for("main_page"))
|
|
|
|
|