Last active
April 1, 2025 07:22
-
-
Save larskanis/d23290db049827149705256206d08caf to your computer and use it in GitHub Desktop.
This is how to access the online service of Microsoft Office 365 by IMAP protocol in Ruby. It uses OAUTH2 authentication through the browser.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This is how to access the online service of Microsoft Office 365 by IMAP protocol in Ruby. | |
# | |
# It uses OAUTH2 authentication through the browser. | |
# The authentication is done with the Thunderbird client_id, so that it should work equally to your Thunderbird access. | |
# | |
# Adjust your mail address: | |
email_address = '[email protected]' | |
require 'bundler/inline' | |
gemfile do | |
source 'https://rubygems.org' | |
gem 'net-imap', '~> 0.5' | |
gem 'oauth2', '~> 2.0' | |
gem 'openssl' | |
end | |
# The following class is extracred from https://github.com/rubygems/rubygems | |
# | |
# Example usage: | |
# | |
# server = TCPServer.new(0) | |
# otp = Gem::WebauthnListener.wait_for_otp_code("https://rubygems.example", server) | |
# | |
class Gem::WebauthnListener | |
class Response | |
attr_reader :http_response | |
def self.for(host) | |
new(host) | |
end | |
def initialize(host) | |
@host = host | |
build_http_response | |
end | |
def to_s | |
status_line = "HTTP/#{@http_response.http_version} #{@http_response.code} #{@http_response.message}\r\n" | |
headers = @http_response.to_hash.map {|header, value| "#{header}: #{value.join(', ')}\r\n" }.join + "\r\n" | |
body = @http_response.body ? "#{@http_response.body}\n" : "" | |
status_line + headers + body | |
end | |
private | |
# Must be implemented in subclasses | |
def code | |
raise NotImplementedError | |
end | |
def reason_phrase | |
raise NotImplementedError | |
end | |
def body; end | |
def build_http_response | |
response_class = Net::HTTPResponse::CODE_TO_OBJ[code.to_s] | |
@http_response = response_class.new("1.1", code, reason_phrase) | |
@http_response.instance_variable_set(:@read, true) | |
add_connection_header | |
add_access_control_headers | |
add_body | |
end | |
def add_connection_header | |
@http_response["connection"] = "close" | |
end | |
def add_access_control_headers | |
@http_response["access-control-allow-origin"] = @host | |
@http_response["access-control-allow-methods"] = "POST" | |
@http_response["access-control-allow-headers"] = %w[Content-Type Authorization x-csrf-token] | |
end | |
def add_body | |
return unless body | |
@http_response["content-type"] = "text/plain" | |
@http_response["content-length"] = body.bytesize | |
@http_response.instance_variable_set(:@body, body) | |
end | |
end | |
class OkResponse < Response | |
private | |
def code | |
200 | |
end | |
def reason_phrase | |
"OK" | |
end | |
def body | |
"success" | |
end | |
end | |
class NoContentResponse < Response | |
private | |
def code | |
204 | |
end | |
def reason_phrase | |
"No Content" | |
end | |
end | |
class BadRequestResponse < Response | |
private | |
def code | |
400 | |
end | |
def reason_phrase | |
"Bad Request" | |
end | |
def body | |
"missing code parameter" | |
end | |
end | |
class NotFoundResponse < Response | |
private | |
def code | |
404 | |
end | |
def reason_phrase | |
"Not Found" | |
end | |
end | |
class MethodNotAllowedResponse < Response | |
private | |
def code | |
405 | |
end | |
def reason_phrase | |
"Method Not Allowed" | |
end | |
def add_access_control_headers | |
super | |
@http_response["allow"] = %w[GET OPTIONS] | |
end | |
end | |
attr_reader :host | |
def initialize(host) | |
@host = host | |
end | |
def self.wait_for_otp_code(host, server) | |
new(host).fetch_otp_from_connection(server) | |
end | |
def fetch_otp_from_connection(server) | |
loop do | |
retries = 3 | |
begin | |
socket = server.accept | |
rescue OpenSSL::SSL::SSLError => err | |
retry if (retries-=1) > 0 | |
raise | |
end | |
request_line = socket.gets | |
method, req_uri, _protocol = request_line.split(" ") | |
req_uri = URI.parse(req_uri) | |
responder = SocketResponder.new(socket) | |
unless root_path?(req_uri) | |
responder.send(NotFoundResponse.for(host)) | |
raise Gem::WebauthnVerificationError, "Page at #{req_uri.path} not found." | |
end | |
case method.upcase | |
when "OPTIONS" | |
responder.send(NoContentResponse.for(host)) | |
next # will be GET | |
when "GET" | |
if otp = parse_otp_from_uri(req_uri) | |
responder.send(OkResponse.for(host)) | |
return otp | |
end | |
responder.send(BadRequestResponse.for(host)) | |
raise Gem::WebauthnVerificationError, "Did not receive OTP from #{host}." | |
else | |
responder.send(MethodNotAllowedResponse.for(host)) | |
raise Gem::WebauthnVerificationError, "Invalid HTTP method #{method.upcase} received." | |
end | |
end | |
end | |
private | |
def root_path?(uri) | |
uri.path == "/" | |
end | |
def parse_otp_from_uri(uri) | |
require "cgi" | |
return if uri.query.nil? | |
CGI.parse(uri.query).dig("code", 0) | |
end | |
class SocketResponder | |
def initialize(socket) | |
@socket = socket | |
end | |
def send(response) | |
@socket.print response.to_s | |
@socket.close | |
end | |
end | |
end | |
def create_cert(name) | |
key = OpenSSL::PKey::RSA.new 2048 | |
cert = OpenSSL::X509::Certificate.new | |
cert.serial = 0 | |
cert.version = 2 | |
cert.not_before = Time.now | |
cert.not_after = Time.now + 86400 | |
cert.public_key = key.public_key | |
cert_name = OpenSSL::X509::Name.parse "CN=#{name}/DC=example" | |
cert.subject = cert_name | |
extension_factory = OpenSSL::X509::ExtensionFactory.new nil, cert | |
cert.issuer = cert_name | |
extension_factory.issuer_certificate = cert | |
# build a CA cert | |
# This extension indicates the CA’s key may be used as a CA. | |
cert.add_extension extension_factory.create_extension('basicConstraints', 'CA:TRUE', true) | |
# This extension indicates the CA’s key may be used to verify signatures on both certificates and certificate revocations. | |
cert.add_extension extension_factory.create_extension('keyUsage', 'cRLSign,keyCertSign', true) | |
cert.add_extension extension_factory.create_extension('subjectKeyIdentifier', 'hash') | |
# Root CA certificates are self-signed. | |
cert.sign(key, OpenSSL::Digest::SHA256.new) | |
[cert, key] | |
end | |
logon_site = 'https://login.microsoftonline.com' | |
# client_id and empty client_secret of Thunderbird | |
client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753' | |
client_secret = '' | |
# Connect to the OAuth2 logon server | |
client = OAuth2::Client.new(client_id, client_secret, site: logon_site, authorize_url: "/common/oauth2/authorize", token_url: "/common/oauth2/token") | |
# Start a local web server with self-signed SSL certs | |
context = OpenSSL::SSL::SSLContext.new | |
context.cert, context.key = create_cert("localhost") | |
server = TCPServer.new(0) | |
sserver = OpenSSL::SSL::SSLServer.new(server, context) | |
port = server.addr[1].to_s | |
redirect_uri = "https://localhost:#{port}" | |
# Build and show the URI with the authentication request | |
auth_url = client.auth_code.authorize_url(redirect_uri: redirect_uri) | |
puts "Please visit the following URI for authentication.\nYou need to accept the invalid certificate warning.\n #{auth_url}" | |
# Wait until the auth_code was received from the browser on the local web server | |
auth_code = Gem::WebauthnListener.wait_for_otp_code(logon_site, sserver) | |
# Request an access token for IMAP | |
access = client.auth_code.get_token(auth_code, redirect_uri: redirect_uri, resource: 'https://outlook.office.com', client_id: client_id) | |
# Connect to the IMAP port | |
imap = Net::IMAP.new('outlook.office365.com', 993, true) | |
imap.authenticate('XOAUTH2', email_address, access.token) | |
# List all imap folders | |
pp imap.list("", "*").map(&:name) # => ["INBOX", "Sent", "Trash", ...] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment