# -*- coding: utf-8 -*- # Copyright 2019 Paul van Tilburg # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import imaplib from imaplib import IMAP4 import pwd from collections import namedtuple from twisted.internet import defer import logging __version__ = "0.1.0" logger = logging.getLogger(__name__) class IMAPAuthProvider: def __init__(self, config, account_handler): self.account_handler = account_handler self.config = config @defer.inlineCallbacks def check_password(self, user_id, password): """ Attempt to authenticate a user against IMAP and register an account if none exists. Returns: True if authentication against IMAP was successful """ if not password: defer.returnValue(False) # The user_id is of the form @foo:bar.com, get the relevant part localpart = user_id.split(":", 1)[0][1:] user = localpart # Append the domain to the user if configured. if self.config.domain: user += "@" + self.config.domain # Now try to authenticate. logger.debug( "Attemping IMAP connection with %s:%s (TLS: %s)", self.config.host, self.config.port, self.config.start_tls ) with IMAP4(self.config.host, self.config.port) as M: if self.config.start_tls: M.starttls() try: logger.debug("Attempting to login using user: %s", user) M.login(user, password) except imaplib.IMAP4.error as e: logger.info("Could not authenticate user %s: %s", user, e) defer.returnValue(False) # From here on, the user is authenticated! logger.info("Successfully authenticated user %s", user) # Bail if we don't want to create users in Matrix. if not self.config.create_users: defer.returnValue(False) # Create the user in Matrix if it doesn't exist yet. if not (yield self.account_handler.check_user_exists(user_id)): yield self.account_handler.register(localpart=localpart) defer.returnValue(True) @staticmethod def parse_config(config): imap_config = namedtuple('_Config', 'create_users host port') imap_config.create_users = config.get('create_users', True) imap_config.host = config.get('host', 'localhost') imap_config.port = config.get('port', 143) imap_config.domain = config.get('domain', None) imap_config.start_tls = config.get('start_tls', True) logger.debug("Parsed IMAP config: %s", imap_config) return imap_config # This is just here for testing purposes. Feel free to change the `test_config` # object and `check_password` function arguments. if __name__ == '__main__': class DummyAccountHandler: def check_user_exists(self, user_id): return False def register(self, localpart=None): print("User %s successfully authenticated, created account" % localpart) test_config = {} parsed_config = IMAPAuthProvider.parse_config(test_config) prov = IMAPAuthProvider(parsed_config, DummyAccountHandler()) prov.check_password('@user:matrix.tld', 'password')