@@ -0,0 +1 @@ | |||
The register_new_matrix_user script is now ported to Python 3. |
@@ -16,207 +16,7 @@ | |||
from __future__ import print_function | |||
import argparse | |||
import getpass | |||
import hashlib | |||
import hmac | |||
import json | |||
import sys | |||
import urllib2 | |||
from six import input | |||
import yaml | |||
def request_registration(user, password, server_location, shared_secret, admin=False): | |||
req = urllib2.Request( | |||
"%s/_matrix/client/r0/admin/register" % (server_location,), | |||
headers={'Content-Type': 'application/json'}, | |||
) | |||
try: | |||
if sys.version_info[:3] >= (2, 7, 9): | |||
# As of version 2.7.9, urllib2 now checks SSL certs | |||
import ssl | |||
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) | |||
else: | |||
f = urllib2.urlopen(req) | |||
body = f.read() | |||
f.close() | |||
nonce = json.loads(body)["nonce"] | |||
except urllib2.HTTPError as e: | |||
print("ERROR! Received %d %s" % (e.code, e.reason)) | |||
if 400 <= e.code < 500: | |||
if e.info().type == "application/json": | |||
resp = json.load(e) | |||
if "error" in resp: | |||
print(resp["error"]) | |||
sys.exit(1) | |||
mac = hmac.new(key=shared_secret, digestmod=hashlib.sha1) | |||
mac.update(nonce) | |||
mac.update("\x00") | |||
mac.update(user) | |||
mac.update("\x00") | |||
mac.update(password) | |||
mac.update("\x00") | |||
mac.update("admin" if admin else "notadmin") | |||
mac = mac.hexdigest() | |||
data = { | |||
"nonce": nonce, | |||
"username": user, | |||
"password": password, | |||
"mac": mac, | |||
"admin": admin, | |||
} | |||
server_location = server_location.rstrip("/") | |||
print("Sending registration request...") | |||
req = urllib2.Request( | |||
"%s/_matrix/client/r0/admin/register" % (server_location,), | |||
data=json.dumps(data), | |||
headers={'Content-Type': 'application/json'}, | |||
) | |||
try: | |||
if sys.version_info[:3] >= (2, 7, 9): | |||
# As of version 2.7.9, urllib2 now checks SSL certs | |||
import ssl | |||
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) | |||
else: | |||
f = urllib2.urlopen(req) | |||
f.read() | |||
f.close() | |||
print("Success.") | |||
except urllib2.HTTPError as e: | |||
print("ERROR! Received %d %s" % (e.code, e.reason)) | |||
if 400 <= e.code < 500: | |||
if e.info().type == "application/json": | |||
resp = json.load(e) | |||
if "error" in resp: | |||
print(resp["error"]) | |||
sys.exit(1) | |||
def register_new_user(user, password, server_location, shared_secret, admin): | |||
if not user: | |||
try: | |||
default_user = getpass.getuser() | |||
except Exception: | |||
default_user = None | |||
if default_user: | |||
user = input("New user localpart [%s]: " % (default_user,)) | |||
if not user: | |||
user = default_user | |||
else: | |||
user = input("New user localpart: ") | |||
if not user: | |||
print("Invalid user name") | |||
sys.exit(1) | |||
if not password: | |||
password = getpass.getpass("Password: ") | |||
if not password: | |||
print("Password cannot be blank.") | |||
sys.exit(1) | |||
confirm_password = getpass.getpass("Confirm password: ") | |||
if password != confirm_password: | |||
print("Passwords do not match") | |||
sys.exit(1) | |||
if admin is None: | |||
admin = input("Make admin [no]: ") | |||
if admin in ("y", "yes", "true"): | |||
admin = True | |||
else: | |||
admin = False | |||
request_registration(user, password, server_location, shared_secret, bool(admin)) | |||
from synapse._scripts.register_new_matrix_user import main | |||
if __name__ == "__main__": | |||
parser = argparse.ArgumentParser( | |||
description="Used to register new users with a given home server when" | |||
" registration has been disabled. The home server must be" | |||
" configured with the 'registration_shared_secret' option" | |||
" set." | |||
) | |||
parser.add_argument( | |||
"-u", | |||
"--user", | |||
default=None, | |||
help="Local part of the new user. Will prompt if omitted.", | |||
) | |||
parser.add_argument( | |||
"-p", | |||
"--password", | |||
default=None, | |||
help="New password for user. Will prompt if omitted.", | |||
) | |||
admin_group = parser.add_mutually_exclusive_group() | |||
admin_group.add_argument( | |||
"-a", | |||
"--admin", | |||
action="store_true", | |||
help=( | |||
"Register new user as an admin. " | |||
"Will prompt if --no-admin is not set either." | |||
), | |||
) | |||
admin_group.add_argument( | |||
"--no-admin", | |||
action="store_true", | |||
help=( | |||
"Register new user as a regular user. " | |||
"Will prompt if --admin is not set either." | |||
), | |||
) | |||
group = parser.add_mutually_exclusive_group(required=True) | |||
group.add_argument( | |||
"-c", | |||
"--config", | |||
type=argparse.FileType('r'), | |||
help="Path to server config file. Used to read in shared secret.", | |||
) | |||
group.add_argument( | |||
"-k", "--shared-secret", help="Shared secret as defined in server config file." | |||
) | |||
parser.add_argument( | |||
"server_url", | |||
default="https://localhost:8448", | |||
nargs='?', | |||
help="URL to use to talk to the home server. Defaults to " | |||
" 'https://localhost:8448'.", | |||
) | |||
args = parser.parse_args() | |||
if "config" in args and args.config: | |||
config = yaml.safe_load(args.config) | |||
secret = config.get("registration_shared_secret", None) | |||
if not secret: | |||
print("No 'registration_shared_secret' defined in config.") | |||
sys.exit(1) | |||
else: | |||
secret = args.shared_secret | |||
admin = None | |||
if args.admin or args.no_admin: | |||
admin = args.admin | |||
register_new_user(args.user, args.password, args.server_url, secret, admin) | |||
main() |
@@ -0,0 +1,215 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2015, 2016 OpenMarket Ltd | |||
# Copyright 2018 New Vector | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from __future__ import print_function | |||
import argparse | |||
import getpass | |||
import hashlib | |||
import hmac | |||
import logging | |||
import sys | |||
from six.moves import input | |||
import requests as _requests | |||
import yaml | |||
def request_registration( | |||
user, | |||
password, | |||
server_location, | |||
shared_secret, | |||
admin=False, | |||
requests=_requests, | |||
_print=print, | |||
exit=sys.exit, | |||
): | |||
url = "%s/_matrix/client/r0/admin/register" % (server_location,) | |||
# Get the nonce | |||
r = requests.get(url, verify=False) | |||
if r.status_code is not 200: | |||
_print("ERROR! Received %d %s" % (r.status_code, r.reason)) | |||
if 400 <= r.status_code < 500: | |||
try: | |||
_print(r.json()["error"]) | |||
except Exception: | |||
pass | |||
return exit(1) | |||
nonce = r.json()["nonce"] | |||
mac = hmac.new(key=shared_secret.encode('utf8'), digestmod=hashlib.sha1) | |||
mac.update(nonce.encode('utf8')) | |||
mac.update(b"\x00") | |||
mac.update(user.encode('utf8')) | |||
mac.update(b"\x00") | |||
mac.update(password.encode('utf8')) | |||
mac.update(b"\x00") | |||
mac.update(b"admin" if admin else b"notadmin") | |||
mac = mac.hexdigest() | |||
data = { | |||
"nonce": nonce, | |||
"username": user, | |||
"password": password, | |||
"mac": mac, | |||
"admin": admin, | |||
} | |||
_print("Sending registration request...") | |||
r = requests.post(url, json=data, verify=False) | |||
if r.status_code is not 200: | |||
_print("ERROR! Received %d %s" % (r.status_code, r.reason)) | |||
if 400 <= r.status_code < 500: | |||
try: | |||
_print(r.json()["error"]) | |||
except Exception: | |||
pass | |||
return exit(1) | |||
_print("Success!") | |||
def register_new_user(user, password, server_location, shared_secret, admin): | |||
if not user: | |||
try: | |||
default_user = getpass.getuser() | |||
except Exception: | |||
default_user = None | |||
if default_user: | |||
user = input("New user localpart [%s]: " % (default_user,)) | |||
if not user: | |||
user = default_user | |||
else: | |||
user = input("New user localpart: ") | |||
if not user: | |||
print("Invalid user name") | |||
sys.exit(1) | |||
if not password: | |||
password = getpass.getpass("Password: ") | |||
if not password: | |||
print("Password cannot be blank.") | |||
sys.exit(1) | |||
confirm_password = getpass.getpass("Confirm password: ") | |||
if password != confirm_password: | |||
print("Passwords do not match") | |||
sys.exit(1) | |||
if admin is None: | |||
admin = input("Make admin [no]: ") | |||
if admin in ("y", "yes", "true"): | |||
admin = True | |||
else: | |||
admin = False | |||
request_registration(user, password, server_location, shared_secret, bool(admin)) | |||
def main(): | |||
logging.captureWarnings(True) | |||
parser = argparse.ArgumentParser( | |||
description="Used to register new users with a given home server when" | |||
" registration has been disabled. The home server must be" | |||
" configured with the 'registration_shared_secret' option" | |||
" set." | |||
) | |||
parser.add_argument( | |||
"-u", | |||
"--user", | |||
default=None, | |||
help="Local part of the new user. Will prompt if omitted.", | |||
) | |||
parser.add_argument( | |||
"-p", | |||
"--password", | |||
default=None, | |||
help="New password for user. Will prompt if omitted.", | |||
) | |||
admin_group = parser.add_mutually_exclusive_group() | |||
admin_group.add_argument( | |||
"-a", | |||
"--admin", | |||
action="store_true", | |||
help=( | |||
"Register new user as an admin. " | |||
"Will prompt if --no-admin is not set either." | |||
), | |||
) | |||
admin_group.add_argument( | |||
"--no-admin", | |||
action="store_true", | |||
help=( | |||
"Register new user as a regular user. " | |||
"Will prompt if --admin is not set either." | |||
), | |||
) | |||
group = parser.add_mutually_exclusive_group(required=True) | |||
group.add_argument( | |||
"-c", | |||
"--config", | |||
type=argparse.FileType('r'), | |||
help="Path to server config file. Used to read in shared secret.", | |||
) | |||
group.add_argument( | |||
"-k", "--shared-secret", help="Shared secret as defined in server config file." | |||
) | |||
parser.add_argument( | |||
"server_url", | |||
default="https://localhost:8448", | |||
nargs='?', | |||
help="URL to use to talk to the home server. Defaults to " | |||
" 'https://localhost:8448'.", | |||
) | |||
args = parser.parse_args() | |||
if "config" in args and args.config: | |||
config = yaml.safe_load(args.config) | |||
secret = config.get("registration_shared_secret", None) | |||
if not secret: | |||
print("No 'registration_shared_secret' defined in config.") | |||
sys.exit(1) | |||
else: | |||
secret = args.shared_secret | |||
admin = None | |||
if args.admin or args.no_admin: | |||
admin = args.admin | |||
register_new_user(args.user, args.password, args.server_url, secret, admin) | |||
if __name__ == "__main__": | |||
main() |
@@ -0,0 +1,160 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2018 New Vector | |||
# | |||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | |||
# You may obtain a copy of the License at | |||
# | |||
# http://www.apache.org/licenses/LICENSE-2.0 | |||
# | |||
# Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from mock import Mock | |||
from synapse._scripts.register_new_matrix_user import request_registration | |||
from tests.unittest import TestCase | |||
class RegisterTestCase(TestCase): | |||
def test_success(self): | |||
""" | |||
The script will fetch a nonce, and then generate a MAC with it, and then | |||
post that MAC. | |||
""" | |||
def get(url, verify=None): | |||
r = Mock() | |||
r.status_code = 200 | |||
r.json = lambda: {"nonce": "a"} | |||
return r | |||
def post(url, json=None, verify=None): | |||
# Make sure we are sent the correct info | |||
self.assertEqual(json["username"], "user") | |||
self.assertEqual(json["password"], "pass") | |||
self.assertEqual(json["nonce"], "a") | |||
# We want a 40-char hex MAC | |||
self.assertEqual(len(json["mac"]), 40) | |||
r = Mock() | |||
r.status_code = 200 | |||
return r | |||
requests = Mock() | |||
requests.get = get | |||
requests.post = post | |||
# The fake stdout will be written here | |||
out = [] | |||
err_code = [] | |||
request_registration( | |||
"user", | |||
"pass", | |||
"matrix.org", | |||
"shared", | |||
admin=False, | |||
requests=requests, | |||
_print=out.append, | |||
exit=err_code.append, | |||
) | |||
# We should get the success message making sure everything is OK. | |||
self.assertIn("Success!", out) | |||
# sys.exit shouldn't have been called. | |||
self.assertEqual(err_code, []) | |||
def test_failure_nonce(self): | |||
""" | |||
If the script fails to fetch a nonce, it throws an error and quits. | |||
""" | |||
def get(url, verify=None): | |||
r = Mock() | |||
r.status_code = 404 | |||
r.reason = "Not Found" | |||
r.json = lambda: {"not": "error"} | |||
return r | |||
requests = Mock() | |||
requests.get = get | |||
# The fake stdout will be written here | |||
out = [] | |||
err_code = [] | |||
request_registration( | |||
"user", | |||
"pass", | |||
"matrix.org", | |||
"shared", | |||
admin=False, | |||
requests=requests, | |||
_print=out.append, | |||
exit=err_code.append, | |||
) | |||
# Exit was called | |||
self.assertEqual(err_code, [1]) | |||
# We got an error message | |||
self.assertIn("ERROR! Received 404 Not Found", out) | |||
self.assertNotIn("Success!", out) | |||
def test_failure_post(self): | |||
""" | |||
The script will fetch a nonce, and then if the final POST fails, will | |||
report an error and quit. | |||
""" | |||
def get(url, verify=None): | |||
r = Mock() | |||
r.status_code = 200 | |||
r.json = lambda: {"nonce": "a"} | |||
return r | |||
def post(url, json=None, verify=None): | |||
# Make sure we are sent the correct info | |||
self.assertEqual(json["username"], "user") | |||
self.assertEqual(json["password"], "pass") | |||
self.assertEqual(json["nonce"], "a") | |||
# We want a 40-char hex MAC | |||
self.assertEqual(len(json["mac"]), 40) | |||
r = Mock() | |||
# Then 500 because we're jerks | |||
r.status_code = 500 | |||
r.reason = "Broken" | |||
return r | |||
requests = Mock() | |||
requests.get = get | |||
requests.post = post | |||
# The fake stdout will be written here | |||
out = [] | |||
err_code = [] | |||
request_registration( | |||
"user", | |||
"pass", | |||
"matrix.org", | |||
"shared", | |||
admin=False, | |||
requests=requests, | |||
_print=out.append, | |||
exit=err_code.append, | |||
) | |||
# Exit was called | |||
self.assertEqual(err_code, [1]) | |||
# We got an error message | |||
self.assertIn("ERROR! Received 500 Broken", out) | |||
self.assertNotIn("Success!", out) |