blob: abf65efe37d98d488132603237589026e04fb0bb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# frozen_string_literal: true
require_relative "webauthn_listener/response"
##
# The WebauthnListener class retrieves an OTP after a user successfully WebAuthns with the Gem host.
# An instance opens a socket using the TCPServer instance given and listens for a request from the Gem host.
# The request should be a GET request to the root path and contains the OTP code in the form
# of a query parameter `code`. The listener will return the code which will be used as the OTP for
# API requests.
#
# Types of responses sent by the listener after receiving a request:
# - 200 OK: OTP code was successfully retrieved
# - 204 No Content: If the request was an OPTIONS request
# - 400 Bad Request: If the request did not contain a query parameter `code`
# - 404 Not Found: The request was not to the root path
# - 405 Method Not Allowed: OTP code was not retrieved because the request was not a GET/OPTIONS request
#
# Example usage:
#
# thread = Gem::WebauthnListener.listener_thread("https://rubygems.example", server)
# thread.join
# otp = thread[:otp]
# error = thread[:error]
#
module Gem::GemcutterUtilities
class WebauthnListener
attr_reader :host
def initialize(host)
@host = host
end
def self.listener_thread(host, server)
Thread.new do
thread = Thread.current
thread.abort_on_exception = true
thread.report_on_exception = false
thread[:otp] = new(host).wait_for_otp_code(server)
rescue Gem::WebauthnVerificationError => e
thread[:error] = e
ensure
server.close
end
end
def wait_for_otp_code(server)
loop do
socket = server.accept
request_line = socket.gets
method, req_uri, _protocol = request_line.split(" ")
req_uri = Gem::URI.parse(req_uri)
responder = SocketResponder.new(socket)
unless root_path?(req_uri)
responder.send(NotFoundResponse.for(host))
raise Gem::WebauthnVerificationError, "Page at #{req_uri.path} not found."
end
case method.upcase
when "OPTIONS"
responder.send(NoContentResponse.for(host))
next # will be GET
when "GET"
if otp = parse_otp_from_uri(req_uri)
responder.send(OkResponse.for(host))
return otp
end
responder.send(BadRequestResponse.for(host))
raise Gem::WebauthnVerificationError, "Did not receive OTP from #{host}."
else
responder.send(MethodNotAllowedResponse.for(host))
raise Gem::WebauthnVerificationError, "Invalid HTTP method #{method.upcase} received."
end
end
end
private
def root_path?(uri)
uri.path == "/"
end
def parse_otp_from_uri(uri)
require "cgi"
return if uri.query.nil?
CGI.parse(uri.query).dig("code", 0)
end
class SocketResponder
def initialize(socket)
@socket = socket
end
def send(response)
@socket.print response.to_s
@socket.close
end
end
end
end
|