This commit is contained in:
MrMeeb 2024-05-09 12:00:07 +01:00
parent 24ba25ab6a
commit 4e178ad676
15 changed files with 379 additions and 85 deletions

View File

@ -68,6 +68,12 @@ For most tests, you will need to have ``redis`` installed and started on your ma
sh scripts/run-test.sh sh scripts/run-test.sh
``` ```
You can also run tests using a local Postgres DB to speed things up. This can be done by
- creating an empty test DB and running the database migration by `dropdb test && createdb test && DB_URI=postgresql://localhost:5432/test alembic upgrade head`
- replacing the `DB_URI` in `test.env` file by `DB_URI=postgresql://localhost:5432/test`
## Run the code locally ## Run the code locally
Install npm packages Install npm packages

View File

@ -3,11 +3,13 @@ from flask_login import login_user
from app.auth.base import auth_bp from app.auth.base import auth_bp
from app.db import Session from app.db import Session
from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import EmailChange, ResetPasswordCode from app.models import EmailChange, ResetPasswordCode
@auth_bp.route("/change_email", methods=["GET", "POST"]) @auth_bp.route("/change_email", methods=["GET", "POST"])
@limiter.limit("3/hour")
def change_email(): def change_email():
code = request.args.get("code") code = request.args.get("code")

View File

@ -1,14 +1,13 @@
from flask import request, session, redirect, flash, url_for from flask import request, session, redirect, flash, url_for
from requests_oauthlib import OAuth2Session from requests_oauthlib import OAuth2Session
import requests
from app import config from app import config
from app.auth.base import auth_bp from app.auth.base import auth_bp
from app.auth.views.login_utils import after_login from app.auth.views.login_utils import after_login
from app.config import ( from app.config import (
URL, URL,
OIDC_AUTHORIZATION_URL,
OIDC_USER_INFO_URL,
OIDC_TOKEN_URL,
OIDC_SCOPES, OIDC_SCOPES,
OIDC_NAME_FIELD, OIDC_NAME_FIELD,
) )
@ -16,14 +15,15 @@ from app.db import Session
from app.email_utils import send_welcome_email from app.email_utils import send_welcome_email
from app.log import LOG from app.log import LOG
from app.models import User, SocialAuth from app.models import User, SocialAuth
from app.utils import encode_url, sanitize_email, sanitize_next_url from app.utils import sanitize_email, sanitize_next_url
# need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri # need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri
# when served behind nginx, the redirect_uri is localhost... and not the real url # when served behind nginx, the redirect_uri is localhost... and not the real url
_redirect_uri = URL + "/auth/oidc/callback" redirect_uri = URL + "/auth/oidc/callback"
SESSION_STATE_KEY = "oauth_state" SESSION_STATE_KEY = "oauth_state"
SESSION_NEXT_KEY = "oauth_redirect_next"
@auth_bp.route("/oidc/login") @auth_bp.route("/oidc/login")
@ -32,18 +32,17 @@ def oidc_login():
return redirect(url_for("auth.login")) return redirect(url_for("auth.login"))
next_url = sanitize_next_url(request.args.get("next")) next_url = sanitize_next_url(request.args.get("next"))
if next_url:
redirect_uri = _redirect_uri + "?next=" + encode_url(next_url) auth_url = requests.get(config.OIDC_WELL_KNOWN_URL).json()["authorization_endpoint"]
else:
redirect_uri = _redirect_uri
oidc = OAuth2Session( oidc = OAuth2Session(
config.OIDC_CLIENT_ID, scope=[OIDC_SCOPES], redirect_uri=redirect_uri config.OIDC_CLIENT_ID, scope=[OIDC_SCOPES], redirect_uri=redirect_uri
) )
authorization_url, state = oidc.authorization_url(OIDC_AUTHORIZATION_URL) authorization_url, state = oidc.authorization_url(auth_url)
# State is used to prevent CSRF, keep this for later. # State is used to prevent CSRF, keep this for later.
session[SESSION_STATE_KEY] = state session[SESSION_STATE_KEY] = state
session[SESSION_NEXT_KEY] = next_url
return redirect(authorization_url) return redirect(authorization_url)
@ -60,19 +59,23 @@ def oidc_callback():
flash("Please use another sign in method then", "warning") flash("Please use another sign in method then", "warning")
return redirect("/") return redirect("/")
oidc_configuration = requests.get(config.OIDC_WELL_KNOWN_URL).json()
user_info_url = oidc_configuration["userinfo_endpoint"]
token_url = oidc_configuration["token_endpoint"]
oidc = OAuth2Session( oidc = OAuth2Session(
config.OIDC_CLIENT_ID, config.OIDC_CLIENT_ID,
state=session[SESSION_STATE_KEY], state=session[SESSION_STATE_KEY],
scope=[OIDC_SCOPES], scope=[OIDC_SCOPES],
redirect_uri=_redirect_uri, redirect_uri=redirect_uri,
) )
oidc.fetch_token( oidc.fetch_token(
OIDC_TOKEN_URL, token_url,
client_secret=config.OIDC_CLIENT_SECRET, client_secret=config.OIDC_CLIENT_SECRET,
authorization_response=request.url, authorization_response=request.url,
) )
oidc_user_data = oidc.get(OIDC_USER_INFO_URL) oidc_user_data = oidc.get(user_info_url)
if oidc_user_data.status_code != 200: if oidc_user_data.status_code != 200:
LOG.e( LOG.e(
f"cannot get oidc user data {oidc_user_data.status_code} {oidc_user_data.text}" f"cannot get oidc user data {oidc_user_data.status_code} {oidc_user_data.text}"
@ -111,7 +114,8 @@ def oidc_callback():
Session.commit() Session.commit()
# The activation link contains the original page, for ex authorize page # The activation link contains the original page, for ex authorize page
next_url = sanitize_next_url(request.args.get("next")) if request.args else None next_url = session[SESSION_NEXT_KEY]
session[SESSION_NEXT_KEY] = None
return after_login(user, next_url) return after_login(user, next_url)

View File

@ -245,9 +245,7 @@ FACEBOOK_CLIENT_ID = os.environ.get("FACEBOOK_CLIENT_ID")
FACEBOOK_CLIENT_SECRET = os.environ.get("FACEBOOK_CLIENT_SECRET") FACEBOOK_CLIENT_SECRET = os.environ.get("FACEBOOK_CLIENT_SECRET")
CONNECT_WITH_OIDC_ICON = os.environ.get("CONNECT_WITH_OIDC_ICON") CONNECT_WITH_OIDC_ICON = os.environ.get("CONNECT_WITH_OIDC_ICON")
OIDC_AUTHORIZATION_URL = os.environ.get("OIDC_AUTHORIZATION_URL") OIDC_WELL_KNOWN_URL = os.environ.get("OIDC_WELL_KNOWN_URL")
OIDC_USER_INFO_URL = os.environ.get("OIDC_USER_INFO_URL")
OIDC_TOKEN_URL = os.environ.get("OIDC_TOKEN_URL")
OIDC_CLIENT_ID = os.environ.get("OIDC_CLIENT_ID") OIDC_CLIENT_ID = os.environ.get("OIDC_CLIENT_ID")
OIDC_CLIENT_SECRET = os.environ.get("OIDC_CLIENT_SECRET") OIDC_CLIENT_SECRET = os.environ.get("OIDC_CLIENT_SECRET")
OIDC_SCOPES = os.environ.get("OIDC_SCOPES") OIDC_SCOPES = os.environ.get("OIDC_SCOPES")

View File

@ -11,9 +11,11 @@ from wtforms.fields.html5 import EmailField
from app.config import ENFORCE_SPF, MAILBOX_SECRET from app.config import ENFORCE_SPF, MAILBOX_SECRET
from app.config import URL from app.config import URL
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.dashboard.views.enter_sudo import sudo_required
from app.db import Session from app.db import Session
from app.email_utils import email_can_be_used_as_mailbox from app.email_utils import email_can_be_used_as_mailbox
from app.email_utils import mailbox_already_used, render, send_email from app.email_utils import mailbox_already_used, render, send_email
from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import Alias, AuthorizedAddress from app.models import Alias, AuthorizedAddress
from app.models import Mailbox from app.models import Mailbox
@ -29,6 +31,8 @@ class ChangeEmailForm(FlaskForm):
@dashboard_bp.route("/mailbox/<int:mailbox_id>/", methods=["GET", "POST"]) @dashboard_bp.route("/mailbox/<int:mailbox_id>/", methods=["GET", "POST"])
@login_required @login_required
@sudo_required
@limiter.limit("20/minute", methods=["POST"])
def mailbox_detail_route(mailbox_id): def mailbox_detail_route(mailbox_id):
mailbox: Mailbox = Mailbox.get(mailbox_id) mailbox: Mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != current_user.id: if not mailbox or mailbox.user_id != current_user.id:
@ -179,8 +183,15 @@ def mailbox_detail_route(mailbox_id):
elif request.form.get("form-name") == "toggle-pgp": elif request.form.get("form-name") == "toggle-pgp":
if request.form.get("pgp-enabled") == "on": if request.form.get("pgp-enabled") == "on":
mailbox.disable_pgp = False if mailbox.is_proton():
flash(f"PGP is enabled on {mailbox.email}", "success") mailbox.disable_pgp = True
flash(
"Enabling PGP for a Proton Mail mailbox is redundant and does not add any security benefit",
"info",
)
else:
mailbox.disable_pgp = False
flash(f"PGP is enabled on {mailbox.email}", "info")
else: else:
mailbox.disable_pgp = True mailbox.disable_pgp = True
flash(f"PGP is disabled on {mailbox.email}", "info") flash(f"PGP is disabled on {mailbox.email}", "info")

View File

@ -227,6 +227,21 @@ def setting():
Session.commit() Session.commit()
flash("Your preference has been updated", "success") flash("Your preference has been updated", "success")
return redirect(url_for("dashboard.setting")) return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "enable_data_breach_check":
if not current_user.is_premium():
flash("Only premium plan can enable data breach monitoring", "warning")
return redirect(url_for("dashboard.setting"))
choose = request.form.get("enable_data_breach_check")
if choose == "on":
LOG.i("User {current_user} has enabled data breach monitoring")
current_user.enable_data_breach_check = True
flash("Data breach monitoring is enabled", "success")
else:
LOG.i("User {current_user} has disabled data breach monitoring")
current_user.enable_data_breach_check = False
flash("Data breach monitoring is disabled", "info")
Session.commit()
return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "sender-in-ra": elif request.form.get("form-name") == "sender-in-ra":
choose = request.form.get("enable") choose = request.form.get("enable")
if choose == "on": if choose == "on":

View File

@ -525,6 +525,11 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
sa.Boolean, default=True, nullable=False, server_default="1" sa.Boolean, default=True, nullable=False, server_default="1"
) )
# user opted in for data breach check
enable_data_breach_check = sa.Column(
sa.Boolean, default=False, nullable=False, server_default="0"
)
# bitwise flags. Allow for future expansion # bitwise flags. Allow for future expansion
flags = sa.Column( flags = sa.Column(
sa.BigInteger, sa.BigInteger,
@ -2644,10 +2649,15 @@ class Mailbox(Base, ModelMixin):
return False return False
def nb_alias(self): def nb_alias(self):
return ( alias_ids = set(
AliasMailbox.filter_by(mailbox_id=self.id).count() am.alias_id
+ Alias.filter_by(mailbox_id=self.id).count() for am in AliasMailbox.filter_by(mailbox_id=self.id).values(
AliasMailbox.alias_id
)
) )
for alias in Alias.filter_by(mailbox_id=self.id).values(Alias.id):
alias_ids.add(alias.id)
return len(alias_ids)
def is_proton(self) -> bool: def is_proton(self) -> bool:
if ( if (
@ -2696,12 +2706,15 @@ class Mailbox(Base, ModelMixin):
@property @property
def aliases(self) -> [Alias]: def aliases(self) -> [Alias]:
ret = Alias.filter_by(mailbox_id=self.id).all() ret = dict(
(alias.id, alias) for alias in Alias.filter_by(mailbox_id=self.id).all()
)
for am in AliasMailbox.filter_by(mailbox_id=self.id): for am in AliasMailbox.filter_by(mailbox_id=self.id):
ret.append(am.alias) if am.alias_id not in ret:
ret[am.alias_id] = am.alias
return ret return list(ret.values())
@classmethod @classmethod
def create(cls, **kw): def create(cls, **kw):

View File

@ -1070,6 +1070,7 @@ def get_alias_to_check_hibp(
Alias.id >= min_alias_id, Alias.id >= min_alias_id,
Alias.id < max_alias_id, Alias.id < max_alias_id,
User.disabled == False, # noqa: E712 User.disabled == False, # noqa: E712
User.enable_data_breach_check,
or_( or_(
User.lifetime, User.lifetime,
ManualSubscription.end_at > now, ManualSubscription.end_at > now,

View File

@ -1180,6 +1180,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
# References and In-Reply-To are used for keeping the email thread # References and In-Reply-To are used for keeping the email thread
headers.REFERENCES, headers.REFERENCES,
headers.IN_REPLY_TO, headers.IN_REPLY_TO,
headers.SL_QUEUE_ID,
] ]
+ headers.MIME_HEADERS, + headers.MIME_HEADERS,
) )

View File

@ -118,9 +118,7 @@ WORDS_FILE_PATH=local_data/test_words.txt
# Login with OIDC # Login with OIDC
# CONNECT_WITH_OIDC_ICON=fa-github # CONNECT_WITH_OIDC_ICON=fa-github
# OIDC_AUTHORIZATION_URL=to_fill # OIDC_WELL_KNOWN_URL=to_fill
# OIDC_USER_INFO_URL=to_fill
# OIDC_TOKEN_URL=to_fill
# OIDC_SCOPES=openid email profile # OIDC_SCOPES=openid email profile
# OIDC_NAME_FIELD=name # OIDC_NAME_FIELD=name
# OIDC_CLIENT_ID=to_fill # OIDC_CLIENT_ID=to_fill

View File

@ -0,0 +1,29 @@
"""empty message
Revision ID: fa2f19bb4e5a
Revises: 52510a633d6f
Create Date: 2024-04-09 13:12:26.305340
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'fa2f19bb4e5a'
down_revision = '52510a633d6f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('enable_data_breach_check', sa.Boolean(), server_default='0', nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'enable_data_breach_check')
# ### end Alembic commands ###

View File

@ -249,6 +249,42 @@
</div> </div>
</div> </div>
<!-- END Random alias --> <!-- END Random alias -->
<!-- Data breach check -->
<div class="card" id="data-breach">
<div class="card-body">
<div class="card-title">Data breach monitoring</div>
<div class="mt-1 mb-3">
{% if not current_user.is_premium() %}
<div class="alert alert-info" role="alert">
This feature is only available on Premium plan.
<a href="{{ url_for('dashboard.pricing') }}"
target="_blank"
rel="noopener noreferrer">
Upgrade<i class="fe fe-external-link"></i>
</a>
</div>
{% endif %}
If enabled, we will inform you via email if one of your aliases appears in a data breach.
<br>
SimpleLogin uses <a href="https://haveibeenpwned.com/">HaveIBeenPwned</a> API for checking for data breaches.
</div>
<form method="post" action="#data-breach">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="enable_data_breach_check">
<div class="form-check">
<input type="checkbox"
id="enable_data_breach_check"
name="enable_data_breach_check"
{% if current_user.enable_data_breach_check %} checked{% endif %}
class="form-check-input">
<label for="enable_data_breach_check">Enable data breach monitoring</label>
</div>
<button type="submit" class="btn btn-outline-primary">Update</button>
</form>
</div>
</div>
<!-- END Data breach check -->
<!-- Sender Format --> <!-- Sender Format -->
<div class="card" id="sender-format"> <div class="card" id="sender-format">
<div class="card-body"> <div class="card-body">
@ -285,7 +321,9 @@
No Name (i.e. only reverse-alias) No Name (i.e. only reverse-alias)
</option> </option>
</select> </select>
<button class="btn btn-outline-primary mt-3">Update</button> <button class="btn btn-outline-primary mt-3">
Update
</button>
</form> </form>
</div> </div>
</div> </div>
@ -295,7 +333,9 @@
<div class="card-body"> <div class="card-body">
<div class="card-title"> <div class="card-title">
Reverse Alias Replacement Reverse Alias Replacement
<div class="badge badge-warning">Experimental</div> <div class="badge badge-warning">
Experimental
</div>
</div> </div>
<div class="mb-3"> <div class="mb-3">
When replying to a forwarded email, the <b>reverse-alias</b> can be automatically included When replying to a forwarded email, the <b>reverse-alias</b> can be automatically included
@ -312,9 +352,13 @@
name="replace-ra" name="replace-ra"
{% if current_user.replace_reverse_alias %} checked{% endif %} {% if current_user.replace_reverse_alias %} checked{% endif %}
class="form-check-input"> class="form-check-input">
<label for="replace-ra">Enable replacing reverse alias</label> <label for="replace-ra">
Enable replacing reverse alias
</label>
</div> </div>
<button type="submit" class="btn btn-outline-primary">Update</button> <button type="submit" class="btn btn-outline-primary">
Update
</button>
</form> </form>
</div> </div>
</div> </div>

View File

@ -1,5 +1,5 @@
from app import config from app import config
from flask import url_for from flask import url_for, session
from urllib.parse import parse_qs from urllib.parse import parse_qs
from urllib3.util import parse_url from urllib3.util import parse_url
from app.auth.views.oidc import create_user from app.auth.views.oidc import create_user
@ -10,7 +10,21 @@ from app.models import User
from app.config import URL, OIDC_CLIENT_ID from app.config import URL, OIDC_CLIENT_ID
def test_oidc_login(flask_client): mock_well_known_response = {
"authorization_endpoint": "http://localhost:7777/authorization-endpoint",
"userinfo_endpoint": "http://localhost:7777/userinfo-endpoint",
"token_endpoint": "http://localhost:7777/token-endpoint",
}
@patch("requests.get")
def test_oidc_login(mock_get, flask_client):
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_login"), url_for("auth.oidc_login"),
follow_redirects=False, follow_redirects=False,
@ -28,8 +42,41 @@ def test_oidc_login(flask_client):
assert expected_redirect_url == query["redirect_uri"][0] assert expected_redirect_url == query["redirect_uri"][0]
def test_oidc_login_no_client_id(flask_client): @patch("requests.get")
def test_oidc_login_next_url(mock_get, flask_client):
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
mock_get.return_value.json.return_value = mock_well_known_response
with flask_client:
r = flask_client.get(
url_for("auth.oidc_login", next="/dashboard/settings/"),
follow_redirects=False,
)
location = r.headers.get("Location")
assert location is not None
parsed = parse_url(location)
query = parse_qs(parsed.query)
expected_redirect_url = f"{URL}/auth/oidc/callback"
assert "code" == query["response_type"][0]
assert OIDC_CLIENT_ID == query["client_id"][0]
assert expected_redirect_url == query["redirect_uri"][0]
assert session["oauth_redirect_next"] == "/dashboard/settings/"
@patch("requests.get")
def test_oidc_login_no_client_id(mock_get, flask_client):
config.OIDC_CLIENT_ID = None config.OIDC_CLIENT_ID = None
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_login"), url_for("auth.oidc_login"),
@ -47,8 +94,14 @@ def test_oidc_login_no_client_id(flask_client):
config.OIDC_CLIENT_ID = "to_fill" config.OIDC_CLIENT_ID = "to_fill"
def test_oidc_login_no_client_secret(flask_client): @patch("requests.get")
def test_oidc_login_no_client_secret(mock_get, flask_client):
config.OIDC_CLIENT_SECRET = None config.OIDC_CLIENT_SECRET = None
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_login"), url_for("auth.oidc_login"),
@ -66,9 +119,14 @@ def test_oidc_login_no_client_secret(flask_client):
config.OIDC_CLIENT_SECRET = "to_fill" config.OIDC_CLIENT_SECRET = "to_fill"
def test_oidc_callback_no_oauth_state(flask_client): @patch("requests.get")
with flask_client.session_transaction() as session: def test_oidc_callback_no_oauth_state(mock_get, flask_client):
session["oauth_state"] = None config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = None
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -78,11 +136,16 @@ def test_oidc_callback_no_oauth_state(flask_client):
assert location is None assert location is None
def test_oidc_callback_no_client_id(flask_client): @patch("requests.get")
with flask_client.session_transaction() as session: def test_oidc_callback_no_client_id(mock_get, flask_client):
session["oauth_state"] = "state" config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
config.OIDC_CLIENT_ID = None config.OIDC_CLIENT_ID = None
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
follow_redirects=False, follow_redirects=False,
@ -97,15 +160,20 @@ def test_oidc_callback_no_client_id(flask_client):
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
config.OIDC_CLIENT_ID = "to_fill" config.OIDC_CLIENT_ID = "to_fill"
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
def test_oidc_callback_no_client_secret(flask_client): @patch("requests.get")
with flask_client.session_transaction() as session: def test_oidc_callback_no_client_secret(mock_get, flask_client):
session["oauth_state"] = "state" config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
config.OIDC_CLIENT_SECRET = None config.OIDC_CLIENT_SECRET = None
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
follow_redirects=False, follow_redirects=False,
@ -120,16 +188,23 @@ def test_oidc_callback_no_client_secret(flask_client):
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
config.OIDC_CLIENT_SECRET = "to_fill" config.OIDC_CLIENT_SECRET = "to_fill"
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests.get")
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_invalid_user(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_invalid_user(
mock_get.return_value = MockResponse(400, {}) mock_oauth_get, mock_fetch_token, mock_get, flask_client
with flask_client.session_transaction() as session: ):
session["oauth_state"] = "state" mock_oauth_get.return_value = MockResponse(400, {})
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -143,18 +218,25 @@ def test_oidc_callback_invalid_user(mock_get, mock_fetch_token, flask_client):
expected_redirect_url = "/auth/login" expected_redirect_url = "/auth/login"
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_oauth_get.called
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests.get")
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_no_email(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_no_email(
mock_get.return_value = MockResponse(200, {}) mock_oauth_get, mock_fetch_token, mock_get, flask_client
with flask_client.session_transaction() as session: ):
session["oauth_state"] = "state" mock_oauth_get.return_value = MockResponse(200, {})
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -168,20 +250,27 @@ def test_oidc_callback_no_email(mock_get, mock_fetch_token, flask_client):
expected_redirect_url = "/auth/login" expected_redirect_url = "/auth/login"
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_oauth_get.called
with flask_client.session_transaction() as session: with flask_client.session_transaction() as session:
session["oauth_state"] = None session["oauth_state"] = None
@patch("requests.get")
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_disabled_registration(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_disabled_registration(
mock_oauth_get, mock_fetch_token, mock_get, flask_client
):
config.DISABLE_REGISTRATION = True config.DISABLE_REGISTRATION = True
email = random_string() email = random_string()
mock_get.return_value = MockResponse(200, {"email": email}) mock_oauth_get.return_value = MockResponse(200, {"email": email})
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
mock_get.return_value.json.return_value = mock_well_known_response
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -195,26 +284,33 @@ def test_oidc_callback_disabled_registration(mock_get, mock_fetch_token, flask_c
expected_redirect_url = "/auth/register" expected_redirect_url = "/auth/register"
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_oauth_get.called
config.DISABLE_REGISTRATION = False config.DISABLE_REGISTRATION = False
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests.get")
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_registration(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_registration(
mock_oauth_get, mock_fetch_token, mock_get, flask_client
):
email = random_string() email = random_string()
mock_get.return_value = MockResponse( mock_oauth_get.return_value = MockResponse(
200, 200,
{ {
"email": email, "email": email,
config.OIDC_NAME_FIELD: "name", config.OIDC_NAME_FIELD: "name",
}, },
) )
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
mock_get.return_value.json.return_value = mock_well_known_response
user = User.get_by(email=email) user = User.get_by(email=email)
assert user is None assert user is None
@ -231,28 +327,33 @@ def test_oidc_callback_registration(mock_get, mock_fetch_token, flask_client):
expected_redirect_url = "/dashboard/" expected_redirect_url = "/dashboard/"
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_oauth_get.called
user = User.get_by(email=email) user = User.get_by(email=email)
assert user is not None assert user is not None
assert user.email == email assert user.email == email
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests.get")
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_login(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_login(mock_oauth_get, mock_fetch_token, mock_get, flask_client):
email = random_string() email = random_string()
mock_get.return_value = MockResponse( mock_oauth_get.return_value = MockResponse(
200, 200,
{ {
"email": email, "email": email,
}, },
) )
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
mock_get.return_value.json.return_value = mock_well_known_response
user = User.create( user = User.create(
email=email, email=email,
@ -275,10 +376,57 @@ def test_oidc_callback_login(mock_get, mock_fetch_token, flask_client):
expected_redirect_url = "/dashboard/" expected_redirect_url = "/dashboard/"
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_oauth_get.called
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests.get")
@patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_login_with_next_url(
mock_oauth_get, mock_fetch_token, mock_get, flask_client
):
email = random_string()
mock_oauth_get.return_value = MockResponse(
200,
{
"email": email,
},
)
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = "/dashboard/settings/"
sess["oauth_state"] = "state"
mock_get.return_value.json.return_value = mock_well_known_response
user = User.create(
email=email,
name="name",
password="",
activated=True,
)
user = User.get_by(email=email)
assert user is not None
r = flask_client.get(
url_for("auth.oidc_callback"),
follow_redirects=False,
)
location = r.headers.get("Location")
assert location is not None
parsed = parse_url(location)
expected_redirect_url = "/dashboard/settings/"
assert expected_redirect_url == parsed.path
assert mock_oauth_get.called
with flask_client.session_transaction() as sess:
sess["oauth_state"] = None
def test_create_user(): def test_create_user():

View File

@ -31,6 +31,7 @@ def test_get_alias_for_free_user_has_no_alias():
def test_get_alias_for_lifetime_with_null_hibp_date(): def test_get_alias_for_lifetime_with_null_hibp_date():
user = create_new_user() user = create_new_user()
user.lifetime = True user.lifetime = True
user.enable_data_breach_check = True
alias_id = Alias.create_new_random(user).id alias_id = Alias.create_new_random(user).id
Session.commit() Session.commit()
aliases = list( aliases = list(
@ -42,6 +43,7 @@ def test_get_alias_for_lifetime_with_null_hibp_date():
def test_get_alias_for_lifetime_with_old_hibp_date(): def test_get_alias_for_lifetime_with_old_hibp_date():
user = create_new_user() user = create_new_user()
user.lifetime = True user.lifetime = True
user.enable_data_breach_check = True
alias = Alias.create_new_random(user) alias = Alias.create_new_random(user)
alias.hibp_last_check = arrow.now().shift(days=-1) alias.hibp_last_check = arrow.now().shift(days=-1)
alias_id = alias.id alias_id = alias.id
@ -97,6 +99,7 @@ sub_generator_list = [
@pytest.mark.parametrize("sub_generator", sub_generator_list) @pytest.mark.parametrize("sub_generator", sub_generator_list)
def test_get_alias_for_sub(sub_generator): def test_get_alias_for_sub(sub_generator):
user = create_new_user() user = create_new_user()
user.enable_data_breach_check = True
sub_generator(user) sub_generator(user)
alias_id = Alias.create_new_random(user).id alias_id = Alias.create_new_random(user).id
Session.commit() Session.commit()
@ -140,3 +143,26 @@ def test_already_checked_is_not_checked():
cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1) cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1)
) )
assert len(aliases) == 0 assert len(aliases) == 0
def test_outed_in_user_is_checked():
user = create_new_user()
user.lifetime = True
user.enable_data_breach_check = True
alias_id = Alias.create_new_random(user).id
Session.commit()
aliases = list(
cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1)
)
assert len(aliases) == 1
def test_outed_out_user_is_not_checked():
user = create_new_user()
user.lifetime = True
alias_id = Alias.create_new_random(user).id
Session.commit()
aliases = list(
cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1)
)
assert len(aliases) == 0

View File

@ -51,9 +51,7 @@ FACEBOOK_CLIENT_SECRET=to_fill
# Login with OIDC # Login with OIDC
CONNECT_WITH_OIDC_ICON=fa-github CONNECT_WITH_OIDC_ICON=fa-github
OIDC_AUTHORIZATION_URL=to_fill OIDC_WELL_KNOWN_URL=to_fill
OIDC_USER_INFO_URL=to_fill
OIDC_TOKEN_URL=to_fill
OIDC_SCOPES=openid email profile OIDC_SCOPES=openid email profile
OIDC_NAME_FIELD=name OIDC_NAME_FIELD=name
OIDC_CLIENT_ID=to_fill OIDC_CLIENT_ID=to_fill