@@ -15,7 +15,7 @@ | |||
# limitations under the License. | |||
from synapse.api.events.utils import prune_event | |||
from synapse.events.utils import prune_event | |||
from syutil.jsonutil import encode_canonical_json | |||
from syutil.base64util import encode_base64, decode_base64 | |||
from syutil.crypto.jsonsign import sign_json | |||
@@ -85,10 +85,10 @@ class EventBase(object): | |||
return hasattr(self, "state_key") | |||
def get_dict(self): | |||
d = dict(self._original) | |||
d = dict(self._event_dict) | |||
d.update({ | |||
"signatures": self._signatures, | |||
"unsigned": self._unsigned, | |||
"signatures": self.signatures, | |||
"unsigned": self.unsigned, | |||
}) | |||
return d | |||
@@ -128,7 +128,7 @@ class FrozenEvent(EventBase): | |||
@staticmethod | |||
def from_event(event): | |||
e = FrozenEvent( | |||
event.event_dict() | |||
event.get_pdu_json() | |||
) | |||
e.internal_metadata = event.internal_metadata | |||
@@ -22,10 +22,13 @@ from synapse.util.stringutils import random_string | |||
class EventBuilder(EventBase): | |||
def __init__(self, key_values={}): | |||
super(FrozenEvent, self).__init__( | |||
super(EventBuilder, self).__init__( | |||
key_values, | |||
) | |||
def update_event_key(self, key, value): | |||
self._event_dict[key] = value | |||
def update_event_keys(self, other_dict): | |||
self._event_dict.update(other_dict) | |||
@@ -74,6 +74,7 @@ class ReplicationLayer(object): | |||
self._clock = hs.get_clock() | |||
self.event_factory = hs.get_event_factory() | |||
self.event_builder_factory = hs.get_event_builder_factory() | |||
def set_handler(self, handler): | |||
"""Sets the handler that the replication layer will use to communicate | |||
@@ -658,19 +659,14 @@ class ReplicationLayer(object): | |||
return "<ReplicationLayer(%s)>" % self.server_name | |||
def event_from_pdu_json(self, pdu_json, outlier=False): | |||
#TODO: Check we have all the PDU keys here | |||
pdu_json.setdefault("hashes", {}) | |||
pdu_json.setdefault("signatures", {}) | |||
sender = pdu_json.pop("sender", None) | |||
if sender is not None: | |||
pdu_json["user_id"] = sender | |||
state_hash = pdu_json.get("unsigned", {}).pop("state_hash", None) | |||
if state_hash is not None: | |||
pdu_json["state_hash"] = state_hash | |||
return self.event_factory.create_event( | |||
pdu_json["type"], outlier=outlier, **pdu_json | |||
builder = self.event_builder_factory.new( | |||
pdu_json | |||
) | |||
builder.internal_metadata = outlier | |||
return builder.build() | |||
class _TransactionQueue(object): | |||
"""This class makes sure we only have one transaction in flight at | |||
@@ -46,6 +46,8 @@ class BaseHandler(object): | |||
self.signing_key = hs.config.signing_key[0] | |||
self.server_name = hs.hostname | |||
self.event_builder_factory = hs.get_event_builder_factory() | |||
def ratelimit(self, user_id): | |||
time_now = self.clock.time() | |||
allowed, time_allowed = self.ratelimiter.send_message( | |||
@@ -92,7 +94,7 @@ class BaseHandler(object): | |||
builder.prev_events = prev_events | |||
builder.depth = depth | |||
auth_events = yield self.auth.get_event_auth(builder, curr_state) | |||
auth_events = yield self.auth.get_auth_events(builder, curr_state) | |||
builder.update_event_key("auth_events", auth_events) | |||
@@ -105,7 +107,7 @@ class BaseHandler(object): | |||
auth_ids = zip(*auth_events)[0] | |||
curr_auth_events = { | |||
k: v | |||
for k, v in curr_state | |||
for k, v in curr_state.items() | |||
if v.event_id in auth_ids | |||
} | |||
@@ -119,14 +121,16 @@ class BaseHandler(object): | |||
) | |||
@defer.inlineCallbacks | |||
def _handle_new_client_event(self, event, context): | |||
def handle_new_client_event(self, event, context, extra_destinations=[], | |||
extra_users=[], suppress_auth=False): | |||
# We now need to go and hit out to wherever we need to hit out to. | |||
self.auth.check(event, auth_events=context.auth_events) | |||
if not suppress_auth: | |||
self.auth.check(event, auth_events=context.auth_events) | |||
yield self.store.persist_event(event) | |||
destinations = set() | |||
destinations = set(extra_destinations) | |||
for k, s in context.current_state.items(): | |||
try: | |||
if k[0] == EventTypes.Member: | |||
@@ -139,7 +143,7 @@ class BaseHandler(object): | |||
"Failed to get destination from event %s", s.event_id | |||
) | |||
yield self.notifier.on_new_room_event(event) | |||
yield self.notifier.on_new_room_event(event, extra_users=extra_users) | |||
federation_handler = self.hs.get_handlers().federation_handler | |||
yield federation_handler.handle_new_event( | |||
@@ -148,16 +148,12 @@ class DirectoryHandler(BaseHandler): | |||
def send_room_alias_update_event(self, user_id, room_id): | |||
aliases = yield self.store.get_aliases_for_room(room_id) | |||
event = self.event_factory.create_event( | |||
etype=RoomAliasesEvent.TYPE, | |||
state_key=self.hs.hostname, | |||
room_id=room_id, | |||
user_id=user_id, | |||
content={"aliases": aliases}, | |||
) | |||
snapshot = yield self.store.snapshot_room(event) | |||
msg_handler = self.hs.get_handlers().message_handler | |||
yield msg_handler.handle_event({ | |||
"type": RoomAliasesEvent.TYPE, | |||
"state_key": self.hs.hostname, | |||
"room_id": room_id, | |||
"sender": user_id, | |||
"content": {"aliases": aliases}, | |||
}) | |||
yield self._on_new_room_event( | |||
event, snapshot, extra_users=[user_id], suppress_auth=True | |||
) |
@@ -421,16 +421,17 @@ class FederationHandler(BaseHandler): | |||
join event for the room and return that. We don *not* persist or | |||
process it until the other server has signed it and sent it back. | |||
""" | |||
event = self.event_factory.create_event( | |||
etype=RoomMemberEvent.TYPE, | |||
content={"membership": Membership.JOIN}, | |||
room_id=context, | |||
user_id=user_id, | |||
state_key=user_id, | |||
) | |||
builder = self.event_builder_factory.new({ | |||
"type": RoomMemberEvent.TYPE, | |||
"content": {"membership": Membership.JOIN}, | |||
"room_id": context, | |||
"sender": user_id, | |||
"state_key": user_id, | |||
}) | |||
snapshot = yield self.store.snapshot_room(event) | |||
snapshot.fill_out_prev_events(event) | |||
event, context = yield self._create_new_client_event( | |||
builder=builder, | |||
) | |||
yield self.state_handler.annotate_event_with_state(event) | |||
yield self.auth.add_auth_events(event) | |||
@@ -15,7 +15,7 @@ | |||
from twisted.internet import defer | |||
from synapse.api.constants import Membership | |||
from synapse.api.constants import EventTypes, Membership | |||
from synapse.api.errors import RoomError | |||
from synapse.streams.config import PaginationConfig | |||
from synapse.util.logcontext import PreserveLoggingContext | |||
@@ -133,6 +133,27 @@ class MessageHandler(BaseHandler): | |||
defer.returnValue(chunk) | |||
@defer.inlineCallbacks | |||
def handle_event(self, event_dict): | |||
builder = self.event_builder_factory.new(event_dict) | |||
event, context = yield self._create_new_client_event( | |||
builder=builder, | |||
) | |||
# TODO: self.validator.validate(event) | |||
if event.type == EventTypes.Member: | |||
member_handler = self.hs.get_handlers().room_member_handler | |||
yield member_handler.change_membership(event, context) | |||
else: | |||
yield self.handle_new_client_event( | |||
event=event, | |||
context=context, | |||
) | |||
defer.returnValue(event) | |||
@defer.inlineCallbacks | |||
def store_room_data(self, event=None): | |||
""" Stores data for a room. | |||
@@ -210,14 +210,11 @@ class ProfileHandler(BaseHandler): | |||
"collect_presencelike_data", user, content | |||
) | |||
new_event = self.event_factory.create_event( | |||
etype=j.type, | |||
room_id=j.room_id, | |||
state_key=j.state_key, | |||
content=content, | |||
user_id=j.state_key, | |||
) | |||
yield self._on_new_room_event( | |||
new_event, snapshot, suppress_auth=True | |||
) | |||
msg_handler = self.hs.get_handlers().message_handler | |||
yield msg_handler.handle_event({ | |||
"type": j.type, | |||
"room_id": j.room_id, | |||
"state_key": j.state_key, | |||
"content": content, | |||
"sender": j.state_key, | |||
}) |
@@ -123,59 +123,37 @@ class RoomCreationHandler(BaseHandler): | |||
user, room_id, is_public=is_public | |||
) | |||
room_member_handler = self.hs.get_handlers().room_member_handler | |||
@defer.inlineCallbacks | |||
def handle_event(event): | |||
snapshot = yield self.store.snapshot_room(event) | |||
logger.debug("Event: %s", event) | |||
if event.type == RoomMemberEvent.TYPE: | |||
yield room_member_handler.change_membership( | |||
event, | |||
do_auth=True | |||
) | |||
else: | |||
yield self._on_new_room_event( | |||
event, snapshot, extra_users=[user], suppress_auth=True | |||
) | |||
msg_handler = self.hs.get_handlers().message_handler | |||
for event in creation_events: | |||
yield handle_event(event) | |||
yield msg_handler.handle_event(event) | |||
if "name" in config: | |||
name = config["name"] | |||
name_event = self.event_factory.create_event( | |||
etype=RoomNameEvent.TYPE, | |||
room_id=room_id, | |||
user_id=user_id, | |||
content={"name": name}, | |||
) | |||
yield handle_event(name_event) | |||
yield msg_handler.handle_event({ | |||
"type": RoomNameEvent.TYPE, | |||
"room_id": room_id, | |||
"sender": user_id, | |||
"content": {"name": name}, | |||
}) | |||
if "topic" in config: | |||
topic = config["topic"] | |||
topic_event = self.event_factory.create_event( | |||
etype=RoomTopicEvent.TYPE, | |||
room_id=room_id, | |||
user_id=user_id, | |||
content={"topic": topic}, | |||
) | |||
yield msg_handler.handle_event({ | |||
"type": RoomTopicEvent.TYPE, | |||
"room_id": room_id, | |||
"sender": user_id, | |||
"content": {"topic": topic}, | |||
}) | |||
yield handle_event(topic_event) | |||
content = {"membership": Membership.INVITE} | |||
for invitee in invite_list: | |||
invite_event = self.event_factory.create_event( | |||
etype=RoomMemberEvent.TYPE, | |||
state_key=invitee, | |||
room_id=room_id, | |||
user_id=user_id, | |||
content=content | |||
) | |||
yield handle_event(invite_event) | |||
yield msg_handler.handle_event({ | |||
"type": RoomMemberEvent.TYPE, | |||
"state_key": invitee, | |||
"room_id": room_id, | |||
"user_id": user_id, | |||
"content": {"membership": Membership.INVITE}, | |||
}) | |||
result = {"room_id": room_id} | |||
@@ -192,22 +170,25 @@ class RoomCreationHandler(BaseHandler): | |||
event_keys = { | |||
"room_id": room_id, | |||
"user_id": creator_id, | |||
"sender": creator_id, | |||
} | |||
def create(etype, **content): | |||
return self.event_factory.create_event( | |||
etype=etype, | |||
content=content, | |||
**event_keys | |||
) | |||
def create(etype, content): | |||
e = { | |||
"type": etype, | |||
"content": content, | |||
} | |||
e.update(event_keys) | |||
return e | |||
creation_event = create( | |||
etype=RoomCreateEvent.TYPE, | |||
creator=creator.to_string(), | |||
content={"creator": creator.to_string()}, | |||
) | |||
join_event = self.event_factory.create_event( | |||
join_event = create( | |||
etype=RoomMemberEvent.TYPE, | |||
state_key=creator_id, | |||
content={ | |||
@@ -216,7 +197,7 @@ class RoomCreationHandler(BaseHandler): | |||
**event_keys | |||
) | |||
power_levels_event = self.event_factory.create_event( | |||
power_levels_event = create( | |||
etype=RoomPowerLevelsEvent.TYPE, | |||
content={ | |||
"users": { | |||
@@ -233,13 +214,12 @@ class RoomCreationHandler(BaseHandler): | |||
"kick": 50, | |||
"redact": 50 | |||
}, | |||
**event_keys | |||
) | |||
join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE | |||
join_rules_event = create( | |||
etype=RoomJoinRulesEvent.TYPE, | |||
join_rule=join_rule, | |||
content={"join_rule": join_rule}, | |||
) | |||
return [ | |||
@@ -351,7 +331,7 @@ class RoomMemberHandler(BaseHandler): | |||
defer.returnValue(member) | |||
@defer.inlineCallbacks | |||
def change_membership(self, event=None, do_auth=True): | |||
def change_membership(self, event, context, do_auth=True): | |||
""" Change the membership status of a user in a room. | |||
Args: | |||
@@ -361,8 +341,6 @@ class RoomMemberHandler(BaseHandler): | |||
""" | |||
target_user_id = event.state_key | |||
snapshot = yield self.store.snapshot_room(event) | |||
## TODO(markjh): get prev state from snapshot. | |||
prev_state = yield self.store.get_room_member( | |||
target_user_id, event.room_id | |||
@@ -374,7 +352,7 @@ class RoomMemberHandler(BaseHandler): | |||
# if this HS is not currently in the room, i.e. we have to do the | |||
# invite/join dance. | |||
if event.membership == Membership.JOIN: | |||
yield self._do_join(event, snapshot, do_auth=do_auth) | |||
yield self._do_join(event, context, do_auth=do_auth) | |||
else: | |||
# This is not a JOIN, so we can handle it normally. | |||
@@ -387,7 +365,7 @@ class RoomMemberHandler(BaseHandler): | |||
yield self._do_local_membership_update( | |||
event, | |||
membership=event.content["membership"], | |||
snapshot=snapshot, | |||
context=context, | |||
do_auth=do_auth, | |||
) | |||
@@ -409,23 +387,21 @@ class RoomMemberHandler(BaseHandler): | |||
host = hosts[0] | |||
content.update({"membership": Membership.JOIN}) | |||
new_event = self.event_factory.create_event( | |||
etype=RoomMemberEvent.TYPE, | |||
state_key=joinee.to_string(), | |||
room_id=room_id, | |||
user_id=joinee.to_string(), | |||
membership=Membership.JOIN, | |||
content=content, | |||
) | |||
snapshot = yield self.store.snapshot_room(new_event) | |||
event, context = yield self.create_new_client_event({ | |||
"type": RoomMemberEvent.TYPE, | |||
"state_key": joinee.to_string(), | |||
"room_id": room_id, | |||
"sender": joinee.to_string(), | |||
"membership": Membership.JOIN, | |||
"content": content, | |||
}) | |||
yield self._do_join(new_event, snapshot, room_host=host, do_auth=True) | |||
yield self._do_join(event, context, room_host=host, do_auth=True) | |||
defer.returnValue({"room_id": room_id}) | |||
@defer.inlineCallbacks | |||
def _do_join(self, event, snapshot, room_host=None, do_auth=True): | |||
def _do_join(self, event, context, room_host=None, do_auth=True): | |||
joinee = self.hs.parse_userid(event.state_key) | |||
# room_id = RoomID.from_string(event.room_id, self.hs) | |||
room_id = event.room_id | |||
@@ -470,7 +446,7 @@ class RoomMemberHandler(BaseHandler): | |||
if should_do_dance: | |||
handler = self.hs.get_handlers().federation_handler | |||
have_joined = yield handler.do_invite_join( | |||
room_host, room_id, event.user_id, event.content, snapshot | |||
room_host, room_id, event.user_id, event.content, context | |||
) | |||
# We want to do the _do_update inside the room lock. | |||
@@ -480,7 +456,7 @@ class RoomMemberHandler(BaseHandler): | |||
yield self._do_local_membership_update( | |||
event, | |||
membership=event.content["membership"], | |||
snapshot=snapshot, | |||
context=context, | |||
do_auth=do_auth, | |||
) | |||
@@ -530,7 +506,7 @@ class RoomMemberHandler(BaseHandler): | |||
defer.returnValue(room_ids) | |||
@defer.inlineCallbacks | |||
def _do_local_membership_update(self, event, membership, snapshot, | |||
def _do_local_membership_update(self, event, membership, context, | |||
do_auth): | |||
yield run_on_reactor() | |||
@@ -543,9 +519,9 @@ class RoomMemberHandler(BaseHandler): | |||
else: | |||
do_invite_host = None | |||
yield self._on_new_room_event( | |||
yield self.handle_new_client_event( | |||
event, | |||
snapshot, | |||
context, | |||
extra_users=[target_user], | |||
suppress_auth=(not do_auth), | |||
do_invite_host=do_invite_host, | |||
@@ -63,7 +63,7 @@ class RestServlet(object): | |||
self.hs = hs | |||
self.handlers = hs.get_handlers() | |||
self.event_factory = hs.get_event_factory() | |||
self.builder_factory = hs.get_event_builder_factory() | |||
self.auth = hs.get_auth() | |||
self.txns = HttpTransactionStore() | |||
@@ -117,10 +117,10 @@ class RoomStateEventRestServlet(RestServlet): | |||
self.on_PUT_no_state_key) | |||
def on_GET_no_state_key(self, request, room_id, event_type): | |||
return self.on_GET(request, room_id, event_type, "") | |||
return self.on_GET(request, room_id, event_type, None) | |||
def on_PUT_no_state_key(self, request, room_id, event_type): | |||
return self.on_PUT(request, room_id, event_type, "") | |||
return self.on_PUT(request, room_id, event_type, None) | |||
@defer.inlineCallbacks | |||
def on_GET(self, request, room_id, event_type, state_key): | |||
@@ -147,28 +147,18 @@ class RoomStateEventRestServlet(RestServlet): | |||
content = _parse_json(request) | |||
event = self.event_factory.create_event( | |||
etype=event_type, # already urldecoded | |||
content=content, | |||
room_id=urllib.unquote(room_id), | |||
user_id=user.to_string(), | |||
state_key=urllib.unquote(state_key) | |||
) | |||
self.validator.validate(event) | |||
msg_handler = self.handlers.message_handler | |||
yield msg_handler.handle_event( | |||
{ | |||
"type": event_type, | |||
"content": content, | |||
"room_id": room_id, | |||
"sender": user.to_string(), | |||
"state_key": urllib.unquote(state_key), | |||
} | |||
) | |||
if event_type == RoomMemberEvent.TYPE: | |||
# membership events are special | |||
handler = self.handlers.room_member_handler | |||
yield handler.change_membership(event) | |||
defer.returnValue((200, {})) | |||
else: | |||
# store random bits of state | |||
msg_handler = self.handlers.message_handler | |||
yield msg_handler.store_room_data( | |||
event=event | |||
) | |||
defer.returnValue((200, {})) | |||
defer.returnValue((200, {})) | |||
# TODO: Needs unit testing for generic events + feedback | |||
@@ -184,17 +174,15 @@ class RoomSendEventRestServlet(RestServlet): | |||
user = yield self.auth.get_user_by_req(request) | |||
content = _parse_json(request) | |||
event = self.event_factory.create_event( | |||
etype=urllib.unquote(event_type), | |||
room_id=urllib.unquote(room_id), | |||
user_id=user.to_string(), | |||
content=content | |||
) | |||
self.validator.validate(event) | |||
msg_handler = self.handlers.message_handler | |||
yield msg_handler.send_message(event) | |||
event = yield msg_handler.handle_event( | |||
{ | |||
"type": urllib.unquote(event_type), | |||
"content": content, | |||
"room_id": urllib.unquote(room_id), | |||
"sender": user.to_string(), | |||
} | |||
) | |||
defer.returnValue((200, {"event_id": event.event_id})) | |||
@@ -251,18 +239,17 @@ class JoinRoomAliasServlet(RestServlet): | |||
ret_dict = yield handler.join_room_alias(user, identifier) | |||
defer.returnValue((200, ret_dict)) | |||
else: # room id | |||
event = self.event_factory.create_event( | |||
etype=RoomMemberEvent.TYPE, | |||
content={"membership": Membership.JOIN}, | |||
room_id=urllib.unquote(identifier.to_string()), | |||
user_id=user.to_string(), | |||
state_key=user.to_string() | |||
msg_handler = self.handlers.message_handler | |||
yield msg_handler.handle_event( | |||
{ | |||
"type": RoomMemberEvent.TYPE, | |||
"content": {"membership": Membership.JOIN}, | |||
"room_id": urllib.unquote(identifier.to_string()), | |||
"sender": user.to_string(), | |||
"state_key": user.to_string(), | |||
} | |||
) | |||
self.validator.validate(event) | |||
handler = self.handlers.room_member_handler | |||
yield handler.change_membership(event) | |||
defer.returnValue((200, {})) | |||
@defer.inlineCallbacks | |||
@@ -414,18 +401,17 @@ class RoomMembershipRestServlet(RestServlet): | |||
if membership_action == "kick": | |||
membership_action = "leave" | |||
event = self.event_factory.create_event( | |||
etype=RoomMemberEvent.TYPE, | |||
content={"membership": unicode(membership_action)}, | |||
room_id=urllib.unquote(room_id), | |||
user_id=user.to_string(), | |||
state_key=state_key | |||
msg_handler = self.handlers.message_handler | |||
yield msg_handler.handle_event( | |||
{ | |||
"type": RoomMemberEvent.TYPE, | |||
"content": {"membership": unicode(membership_action)}, | |||
"room_id": urllib.unquote(room_id), | |||
"sender": user.to_string(), | |||
"state_key": state_key, | |||
} | |||
) | |||
self.validator.validate(event) | |||
handler = self.handlers.room_member_handler | |||
yield handler.change_membership(event) | |||
defer.returnValue((200, {})) | |||
@defer.inlineCallbacks | |||
@@ -453,18 +439,16 @@ class RoomRedactEventRestServlet(RestServlet): | |||
user = yield self.auth.get_user_by_req(request) | |||
content = _parse_json(request) | |||
event = self.event_factory.create_event( | |||
etype=RoomRedactionEvent.TYPE, | |||
room_id=urllib.unquote(room_id), | |||
user_id=user.to_string(), | |||
content=content, | |||
redacts=urllib.unquote(event_id), | |||
) | |||
self.validator.validate(event) | |||
msg_handler = self.handlers.message_handler | |||
yield msg_handler.send_message(event) | |||
event = yield msg_handler.handle_event( | |||
{ | |||
"type": RoomRedactionEvent.TYPE, | |||
"content": content, | |||
"room_id": urllib.unquote(room_id), | |||
"sender": user.to_string(), | |||
"redacts": urllib.unquote(event_id), | |||
} | |||
) | |||
defer.returnValue((200, {"event_id": event.event_id})) | |||
@@ -36,6 +36,7 @@ from synapse.util.lockutils import LockManager | |||
from synapse.streams.events import EventSources | |||
from synapse.api.ratelimiting import Ratelimiter | |||
from synapse.crypto.keyring import Keyring | |||
from synapse.events.builder import EventBuilderFactory | |||
class BaseHomeServer(object): | |||
@@ -82,6 +83,7 @@ class BaseHomeServer(object): | |||
'ratelimiter', | |||
'keyring', | |||
'event_validator', | |||
'event_builder_factory', | |||
] | |||
def __init__(self, hostname, **kwargs): | |||
@@ -231,6 +233,12 @@ class HomeServer(BaseHomeServer): | |||
def build_event_validator(self): | |||
return EventValidator(self) | |||
def build_event_builder_factory(self): | |||
return EventBuilderFactory( | |||
clock=self.get_clock(), | |||
hostname=self.hostname, | |||
) | |||
def register_servlets(self): | |||
""" Register all servlets associated with this HomeServer. | |||
""" | |||
@@ -13,6 +13,8 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
from twisted.internet import defer | |||
from _base import SQLBaseStore | |||
from syutil.base64util import encode_base64 | |||
@@ -69,8 +71,9 @@ class SignatureStore(SQLBaseStore): | |||
f | |||
) | |||
@defer.inlineCallbacks | |||
def add_event_hashes(self, event_ids): | |||
hashes = yield self.store.get_event_reference_hashes( | |||
hashes = yield self.get_event_reference_hashes( | |||
event_ids | |||
) | |||
hashes = [ | |||