matrix-synapse-imap/imap_auth_provider.py

111 lines
3.7 KiB
Python
Raw Normal View History

2019-09-20 23:09:51 +02:00
# -*- coding: utf-8 -*-
# Copyright 2019 Paul van Tilburg
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import imaplib
from imaplib import IMAP4
import pwd
from collections import namedtuple
from twisted.internet import defer
import logging
__version__ = "0.1.0"
logger = logging.getLogger(__name__)
class IMAPAuthProvider:
def __init__(self, config, account_handler):
self.account_handler = account_handler
self.config = config
@defer.inlineCallbacks
def check_password(self, user_id, password):
""" Attempt to authenticate a user against IMAP
and register an account if none exists.
Returns:
True if authentication against IMAP was successful
"""
if not password:
defer.returnValue(False)
# The user_id is of the form @foo:bar.com, get the relevant part
localpart = user_id.split(":", 1)[0][1:]
user = localpart
# Append the domain to the user if configured.
if self.config.domain:
user += "@" + self.config.domain
# Now try to authenticate.
logger.debug(
"Attemping IMAP connection with %s:%s (TLS: %s)",
self.config.host,
self.config.port,
self.config.start_tls
)
with IMAP4(self.config.host, self.config.port) as M:
if self.config.start_tls:
M.starttls()
try:
logger.debug("Attempting to login using user: %s", user)
M.login(user, password)
except imaplib.IMAP4.error as e:
logger.info("Could not authenticate user %s: %s", user, e)
defer.returnValue(False)
# From here on, the user is authenticated!
logger.info("Successfully authenticated user %s", user)
# Bail if we don't want to create users in Matrix.
if not self.config.create_users:
defer.returnValue(False)
# Create the user in Matrix if it doesn't exist yet.
if not (yield self.account_handler.check_user_exists(user_id)):
yield self.account_handler.register(localpart=localpart)
defer.returnValue(True)
@staticmethod
def parse_config(config):
imap_config = namedtuple('_Config', 'create_users host port')
imap_config.create_users = config.get('create_users', True)
imap_config.host = config.get('host', 'localhost')
imap_config.port = config.get('port', 143)
imap_config.domain = config.get('domain', None)
imap_config.start_tls = config.get('start_tls', True)
logger.debug("Parsed IMAP config: %s", imap_config)
return imap_config
# This is just here for testing purposes. Feel free to change the `test_config`
# object and `check_password` function arguments.
if __name__ == '__main__':
class DummyAccountHandler:
def check_user_exists(self, user_id):
return False
def register(self, localpart=None):
print("User %s successfully authenticated, created account" % localpart)
test_config = {}
parsed_config = IMAPAuthProvider.parse_config(test_config)
prov = IMAPAuthProvider(parsed_config, DummyAccountHandler())
prov.check_password('@user:matrix.tld', 'password')