Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions fasthtml/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,38 @@
'fasthtml.jupyter.wait_port_free': ('api/jupyter.html#wait_port_free', 'fasthtml/jupyter.py'),
'fasthtml.jupyter.ws_client': ('api/jupyter.html#ws_client', 'fasthtml/jupyter.py')},
'fasthtml.live_reload': {},
'fasthtml.magickey': { 'fasthtml.magickey.MagicKey': ('api/magickey.html#magickey', 'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.__init__': ('api/magickey.html#magickey.__init__', 'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._do_auth': ('api/magickey.html#magickey._do_auth', 'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._finish_passkey_reg': ( 'api/magickey.html#magickey._finish_passkey_reg',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._logout': ('api/magickey.html#magickey._logout', 'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._request_passkey_auth': ( 'api/magickey.html#magickey._request_passkey_auth',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._request_passkey_reg': ( 'api/magickey.html#magickey._request_passkey_reg',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._send_magic_link': ( 'api/magickey.html#magickey._send_magic_link',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._skip_passkey_reg': ( 'api/magickey.html#magickey._skip_passkey_reg',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._verify_magiclink': ( 'api/magickey.html#magickey._verify_magiclink',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey._verify_passkey_auth': ( 'api/magickey.html#magickey._verify_passkey_auth',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.after_magiclink_verify': ( 'api/magickey.html#magickey.after_magiclink_verify',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.get_auth': ('api/magickey.html#magickey.get_auth', 'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.get_passkey': ( 'api/magickey.html#magickey.get_passkey',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.get_user_id': ( 'api/magickey.html#magickey.get_user_id',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.has_passkey': ( 'api/magickey.html#magickey.has_passkey',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.save_passkey': ( 'api/magickey.html#magickey.save_passkey',
'fasthtml/magickey.py'),
'fasthtml.magickey.MagicKey.update_passkey': ( 'api/magickey.html#magickey.update_passkey',
'fasthtml/magickey.py'),
'fasthtml.magickey._webauthn_js': ('api/magickey.html#_webauthn_js', 'fasthtml/magickey.py')},
'fasthtml.oauth': { 'fasthtml.oauth.AppleAppClient': ('api/oauth.html#appleappclient', 'fasthtml/oauth.py'),
'fasthtml.oauth.AppleAppClient.__init__': ('api/oauth.html#appleappclient.__init__', 'fasthtml/oauth.py'),
'fasthtml.oauth.AppleAppClient.client_secret': ( 'api/oauth.html#appleappclient.client_secret',
Expand Down Expand Up @@ -254,6 +286,19 @@
'fasthtml.pico.PicoBusy': ('api/pico.html#picobusy', 'fasthtml/pico.py'),
'fasthtml.pico.Search': ('api/pico.html#search', 'fasthtml/pico.py'),
'fasthtml.pico.set_pico_cls': ('api/pico.html#set_pico_cls', 'fasthtml/pico.py')},
'fasthtml.ratelimit': { 'fasthtml.ratelimit.Limiter': ('api/ratelimit.html#limiter', 'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.Limiter.__call__': ('api/ratelimit.html#limiter.__call__', 'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.Limiter.__init__': ('api/ratelimit.html#limiter.__init__', 'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.TokenBucket': ('api/ratelimit.html#tokenbucket', 'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.TokenBucket.__init__': ( 'api/ratelimit.html#tokenbucket.__init__',
'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.TokenBucket.__repr__': ( 'api/ratelimit.html#tokenbucket.__repr__',
'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.TokenBucket._prune': ( 'api/ratelimit.html#tokenbucket._prune',
'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.TokenBucket.wait': ('api/ratelimit.html#tokenbucket.wait', 'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.client_ip': ('api/ratelimit.html#client_ip', 'fasthtml/ratelimit.py'),
'fasthtml.ratelimit.parse_rate': ('api/ratelimit.html#parse_rate', 'fasthtml/ratelimit.py')},
'fasthtml.starlette': {},
'fasthtml.stripe_otp': { 'fasthtml.stripe_otp.Payment': ('explains/stripe.html#payment', 'fasthtml/stripe_otp.py'),
'fasthtml.stripe_otp._search_app': ('explains/stripe.html#_search_app', 'fasthtml/stripe_otp.py'),
Expand Down
165 changes: 165 additions & 0 deletions fasthtml/magickey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Passwordless auth combining magic links and passkeys"""

# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/api/10_magickey.ipynb.

# %% auto #0
__all__ = ['MagicKey']

# %% ../nbs/api/10_magickey.ipynb #d3fd43fe
import secrets, time
from json import loads
from urllib.parse import urlparse

from webauthn import generate_authentication_options, generate_registration_options, verify_authentication_response, verify_registration_response, options_to_json
from webauthn.helpers import base64url_to_bytes, bytes_to_base64url
from webauthn.helpers.structs import AuthenticatorSelectionCriteria, ResidentKeyRequirement, UserVerificationRequirement

from fastcore.utils import *
from fastcore.xml import *
from fastlite import *
from .basics import *
from .starlette import *
from .fastapp import *
from .ratelimit import *

# %% ../nbs/api/10_magickey.ipynb #03d86e44
# _origin_kw removed; public_origin / rp_id inlined at call sites


# %% ../nbs/api/10_magickey.ipynb #02b50590
def _webauthn_js(opts, action, callback):
return Script("SimpleWebAuthnBrowser.%s({ optionsJSON: %s }).then(r => {"
"htmx.ajax('POST', '%s', { values: r });});" % (action, options_to_json(opts), callback))

# %% ../nbs/api/10_magickey.ipynb #032d5010
class MagicKey:
def __init__(self,
app, # FastHTML app instance
send_email, # `f(email, magic_url)` — sends the magic link
public_origin, # Full origin URL, e.g. `https://example.com`
rp_id=None, # WebAuthn relying party ID (default: hostname from `public_origin`)
rp_name=None, # WebAuthn relying party display name (default: `rp_id`)
skip=None, # Routes to skip auth beforeware (default: all MagicKey routes)
login_path='/login', # Login page route
logout_path='/logout', # Logout route
token_expiry=3600, # Magic link validity in seconds
email_rate='3/m', # Per-email rate limit on magic link sends (None to disable)
ip_rate='10/m', # Per-IP rate limit on magic link sends (None to disable)
webauthn_js=None): # Script tag or URL for SimpleWebAuthn browser JS (default: jsdelivr CDN)
"Passwordless auth combining magic links and passkeys"
if not rp_id: rp_id = urlparse(public_origin).hostname
self.magiclink_db = {}
self._email_bucket = TokenBucket(email_rate) if email_rate else None
self._ip_bucket = TokenBucket(ip_rate) if ip_rate else None
store_attr()
async def _before(req, session):
if 'auth' not in req.scope: req.scope['auth'] = session.get('auth')
if not req.scope['auth']: return RedirectResponse(login_path, status_code=303)
if not skip: skip = [login_path, logout_path, '/finish_passkey_reg', '/send_magic_link',
'/request_passkey_auth', '/request_passkey_reg', '/setup_passkey',
'/skip_passkey_reg', '/verify_magiclink', '/verify_passkey_auth',]
app.before.append(Beforeware(_before, skip=skip))
if not webauthn_js: webauthn_js = Script(src='https://cdn.jsdelivr.net/npm/@simplewebauthn/[email protected]/dist/bundle/index.umd.min.js')
elif isinstance(webauthn_js, str): webauthn_js = Script(src=webauthn_js)
app.hdrs += (webauthn_js,)
app.get(logout_path)(self._logout)
app.post('/send_magic_link')(self._send_magic_link)
app.get('/verify_magiclink')(self._verify_magiclink)
app.post('/request_passkey_auth')(self._request_passkey_auth)
app.post('/verify_passkey_auth')(self._verify_passkey_auth)
app.post('/request_passkey_reg')(self._request_passkey_reg)
app.post('/finish_passkey_reg')(self._finish_passkey_reg)
app.post('/skip_passkey_reg')(self._skip_passkey_reg)

def _logout(self, session):
session.pop('auth', None)
return RedirectResponse(self.login_path, status_code=303)

def _send_magic_link(self, email: str, req):
for bucket,key in ((self._ip_bucket, client_ip(req)), (self._email_bucket, email)):
if bucket and (w:=bucket.wait(key)): return Response('Too many requests', status_code=429, headers={'Retry-After': str(int(w)+1)})
self.magiclink_db = {k:v for k,v in self.magiclink_db.items() if not v['used'] and time.time() - v['timestamp'] <= self.token_expiry}
token = secrets.token_urlsafe(32)
self.magiclink_db[token] = dict(email=email, timestamp=time.time(), used=False)
magic_url = f'{self.public_origin}/verify_magiclink?token={token}'
return self.send_email(email, magic_url)

def _verify_magiclink(self, token: str, session, req):
link = self.magiclink_db.get(token)
if not link or link['used'] or time.time() - link['timestamp'] > self.token_expiry:
return RedirectResponse(f'{self.login_path}?error=invalid_link', status_code=303)
link['used'] = True
return self.after_magiclink_verify(link['email'], session, req)

def _request_passkey_auth(self, session):
auth_opts = generate_authentication_options(
rp_id=self.rp_id, user_verification=UserVerificationRequirement.REQUIRED)
session['auth_challenge'] = bytes_to_base64url(auth_opts.challenge)
return _webauthn_js(auth_opts, 'startAuthentication', '/verify_passkey_auth')

def _verify_passkey_auth(self, response: str, id: str, rawId: str, type: str, session):
challenge_b64 = session.pop('auth_challenge', None)
if not challenge_b64: return HttpHeader('HX-Redirect', f'{self.login_path}?error=no_challenge')
stored = self.get_passkey(id)
if not stored: return HttpHeader('HX-Redirect', f'{self.login_path}?error=passkey_not_found')
try:
res = verify_authentication_response(
credential=dict(id=id, rawId=rawId, response=loads(response), type=type),
credential_public_key=stored['public_key'],
credential_current_sign_count=stored['sign_count'],
expected_challenge=base64url_to_bytes(challenge_b64),
require_user_verification=True,
expected_origin=self.public_origin, expected_rp_id=self.rp_id)
except Exception: return HttpHeader('HX-Redirect', f'{self.login_path}?error=passkey_failed')
self.update_passkey(id, res.new_sign_count)
session['auth'] = self.get_user_id(stored['email'])
return self._do_auth(session['auth'], session, htmx=True)

def _request_passkey_reg(self, session):
email = session.get('pending_email')
if not email: return RedirectResponse(f'{self.login_path}?error=no_pending_reg', status_code=303)
opts = generate_registration_options(
rp_id=self.rp_id, rp_name=self.rp_name or self.rp_id, user_name=email,
user_id=str(self.get_user_id(email)).encode(),
authenticator_selection=AuthenticatorSelectionCriteria(resident_key=ResidentKeyRequirement.REQUIRED))
session['reg_challenge'] = bytes_to_base64url(opts.challenge)
return _webauthn_js(opts, 'startRegistration', '/finish_passkey_reg')

def _finish_passkey_reg(self, response: str, id: str, rawId: str, type: str, session):
email = session.pop('pending_email', None)
if not email: return RedirectResponse(f'{self.login_path}?error=no_pending_reg', status_code=303)
challenge_b64 = session.pop('reg_challenge', None)
if not challenge_b64: return RedirectResponse('/setup_passkey?error=no_challenge', status_code=303)
try: res = verify_registration_response(credential=dict(id=id, rawId=rawId, response=loads(response), type=type),
expected_challenge=base64url_to_bytes(challenge_b64),
require_user_verification=True,
expected_origin=self.public_origin, expected_rp_id=self.rp_id)
except Exception: return RedirectResponse('/setup_passkey?error=reg_failed', status_code=303)
self.save_passkey(bytes_to_base64url(res.credential_id), email, res.credential_public_key, res.sign_count)
session['auth'] = self.get_user_id(email)
return self._do_auth(session['auth'], session, htmx=True)

def _skip_passkey_reg(self, session):
email = session.pop('pending_email', None)
if not email: return RedirectResponse(self.login_path, status_code=303)
session['auth'] = self.get_user_id(email)
return self._do_auth(session['auth'], session, htmx=False)

def _do_auth(self, user_id, session, htmx=False):
url = self.get_auth(user_id, session)
if htmx: return HttpHeader('HX-Redirect', url)
return RedirectResponse(url, status_code=303)

def get_auth(self, user_id, session): return '/'
def get_user_id(self, email): raise NotImplementedError()
def has_passkey(self, email): raise NotImplementedError()
def get_passkey(self, credential_id): raise NotImplementedError()
def save_passkey(self, credential_id, email, public_key, sign_count): raise NotImplementedError()
def update_passkey(self, credential_id, sign_count): raise NotImplementedError()

def after_magiclink_verify(self, email, session, req):
if self.has_passkey(email):
session['auth'] = self.get_user_id(email)
return self._do_auth(session['auth'], session, htmx=False)
session['pending_email'] = email
return RedirectResponse('/setup_passkey', status_code=303)
Loading
Loading