@@ -2,7 +2,7 @@ import json
import logging
from collections import deque
from io import SEEK_END, BytesIO
from typing import Callable, Iterable, MutableMapping, Optional, Tuple, Union
from typing import Callable, Dict, Iterable, MutableMapping, Optional, Tuple, Union
import attr
from typing_extensions import Deque
@@ -13,8 +13,11 @@ from twisted.internet._resolver import SimpleResolverComplexifier
from twisted.internet.defer import Deferred, fail, succeed
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import (
IHostnameResolver,
IProtocol,
IPullProducer,
IPushProducer,
IReactorPluggableNameResolver,
IReactorTCP,
IResolverSimple,
ITransport,
)
@@ -45,11 +48,11 @@ class FakeChannel:
wire).
"""
site = attr.ib(type=Site)
site = attr.ib(type=Union[ Site, "FakeSite"] )
_reactor = attr.ib()
result = attr.ib(type=dict, default=attr.Factory(dict))
_ip = attr.ib(type=str, default="127.0.0.1")
_producer = None
_producer = None # type: Optional[Union[IPullProducer, IPushProducer]]
@property
def json_body(self):
@@ -159,7 +162,11 @@ class FakeChannel:
Any cookines found are added to the given dict
"""
for h in self.headers.getRawHeaders("Set-Cookie"):
headers = self.headers.getRawHeaders("Set-Cookie")
if not headers:
return
for h in headers:
parts = h.split(";")
k, v = parts[0].split("=", maxsplit=1)
cookies[k] = v
@@ -311,8 +318,8 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
self._tcp_callbacks = {}
self._udp = []
lookups = self.lookups = {}
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()
lookups = self.lookups = {} # type: Dict[str, str]
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]
@implementer(IResolverSimple)
class FakeResolver:
@@ -324,6 +331,9 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
self.nameResolver = SimpleResolverComplexifier(FakeResolver())
super().__init__()
def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
raise NotImplementedError()
def listenUDP(self, port, protocol, interface="", maxPacketSize=8196):
p = udp.Port(port, protocol, interface, maxPacketSize, self)
p.startListening()
@@ -621,7 +631,9 @@ class FakeTransport:
self.disconnected = True
def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
def connect_client(
reactor: ThreadedMemoryReactorClock, client_id: int
) -> Tuple[IProtocol, AccumulatingProtocol]:
"""
Connect a client to a fake TCP transport.