Commit 18a804f8 authored by Scott Wittenburg's avatar Scott Wittenburg Committed by Code Review
Browse files

Merge topic 'pvweb-update-autobahn-for-longpoll' into master

81a6eb04 Update autobahn client/server as well as vtkweb connect for longpoll.
parents d5876d34 81a6eb04
......@@ -16,5 +16,5 @@
##
###############################################################################
__version__ = "0.8.13"
__version__ = "0.8.14"
version = __version__ # backward compat.
......@@ -119,7 +119,7 @@ def pbkdf2_bin(data, salt, iterations = 1000, keylen = 32, hashfunc = None):
def derive_key(secret, salt, iterations = 1000, keylen = 32):
def derive_key(secret, salt, iterations = None, keylen = None):
"""
Computes a derived cryptographic key from a password according to PBKDF2.
......@@ -136,7 +136,7 @@ def derive_key(secret, salt, iterations = 1000, keylen = 32):
:returns: str -- The derived key in Base64 encoding.
"""
key = pbkdf2_bin(secret, salt, iterations, keylen)
key = pbkdf2_bin(secret, salt, iterations or 1000, keylen or 32)
return binascii.b2a_base64(key).strip()
......@@ -171,4 +171,4 @@ def compute_wcs(key, challenge):
:returns: str -- The authentication signature.
"""
sig = hmac.new(key, challenge, hashlib.sha256).digest()
return binascii.b2a_base64(sig).strip()
return binascii.b2a_base64(sig).strip().decode('ascii')
......@@ -288,6 +288,16 @@ class ISession(object):
"""
@abc.abstractmethod
def onChallenge(self, challenge):
"""
Callback fired when the peer demands authentication.
:param challenge: The authentication challenge.
:type challenge: Instance of :class:`autobahn.wamp.types.Challenge`.
"""
@abc.abstractmethod
def onJoin(self, details):
"""
......@@ -359,32 +369,33 @@ class ICaller(ISession):
"""
Call a remote procedure.
This will return a Deferred/Future, that when resolved, provides the actual result.
This will return a Deferred/Future, that when resolved, provides the actual result
returned by the called remote procedure.
If the result is a single positional return value, it'll be returned "as-is". If the
result contains multiple positional return values or keyword return values,
the result is wrapped in an instance of :class:`autobahn.wamp.types.CallResult`.
- If the result is a single positional return value, it'll be returned "as-is".
If the call fails, the returned Deferred/Future will be rejected with an instance
of :class:`autobahn.wamp.exception.ApplicationError`.
- If the result contains multiple positional return values or keyword return values,
the result is wrapped in an instance of :class:`autobahn.wamp.types.CallResult`.
If the *Caller* and *Dealer* implementations support canceling of calls, the call may
be canceled by canceling the returned Deferred/Future.
- If the call fails, the returned Deferred/Future will be rejected with an instance
of :class:`autobahn.wamp.exception.ApplicationError`.
If ``kwargs`` contains an ``options`` keyword argument that is an instance of
:class:`autobahn.wamp.types.CallOptions`, this will provide
specific options for the call to perform.
:class:`autobahn.wamp.types.CallOptions`, this will provide specific options for
the call to perform.
:param procedure: The URI of the remote procedure to be called, e.g. ``"com.myapp.hello"``.
:type procedure: str
When the *Caller* and *Dealer* implementations support canceling of calls, the call may
be canceled by canceling the returned Deferred/Future.
:param procedure: The URI of the remote procedure to be called, e.g. ``u"com.myapp.hello"``.
:type procedure: unicode
:param args: Any positional arguments for the call.
:type args: list
:param kwargs: Any keyword arguments for the call.
:type kwargs: dict
:returns: obj -- A Deferred/Future for the call result -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted) or
an instance of :class:`asyncio.Future` (when running under asyncio).
:returns: A Deferred/Future for the call result -
:rtype: instance of :tx:`twisted.internet.defer.Deferred` / :py:class:`asyncio.Future`
"""
......@@ -414,18 +425,21 @@ class IRegistration(object):
Unregister this registration that was previously created from
:func:`autobahn.wamp.interfaces.ICallee.register`.
After a registration has been unregistered, calls won't get routed
to the endpoint any more.
After a registration has been unregistered successfully, no calls
will be routed to the endpoint anymore.
This will return a Deferred/Future, that when resolved signals
successful unregistration.
Returns an instance of :tx:`twisted.internet.defer.Deferred` (when
running on **Twisted**) or an instance of :py:class:`asyncio.Future`
(when running on **asyncio**).
If the unregistration fails, the returned Deferred/Future will be rejected
with an instance of :class:`autobahn.wamp.exception.ApplicationError`.
- If the unregistration succeeds, the returned Deferred/Future will
*resolve* (with no return value).
:returns: obj -- A Deferred/Future for the unregistration -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
- If the unregistration fails, the returned Deferred/Future will be rejected
with an instance of :class:`autobahn.wamp.exception.ApplicationError`.
:returns: A Deferred/Future for the unregistration
:rtype: instance(s) of :tx:`twisted.internet.defer.Deferred` / :py:class:`asyncio.Future`
"""
......@@ -438,37 +452,34 @@ class ICallee(ISession):
@abc.abstractmethod
def register(self, endpoint, procedure = None, options = None):
"""
Register an endpoint for a procedure to (subsequently) receive calls
calling that procedure.
Register a procedure for remote calling.
If ``endpoint`` is a callable (function, method or object that implements ``__call__``),
then `procedure` must be provided and an instance of
:class:`twisted.internet.defer.Deferred` (when running on Twisted) or an instance
of :class:`asyncio.Future` (when running on asyncio) is returned.
When ``endpoint`` is a callable (function, method or object that implements ``__call__``),
then ``procedure`` must be provided and an instance of
:tx:`twisted.internet.defer.Deferred` (when running on **Twisted**) or an instance
of :py:class:`asyncio.Future` (when running on **asyncio**) is returned.
If the registration succeeds the Deferred/Future will resolve to an object
that implements :class:`autobahn.wamp.interfaces.Registration`.
- If the registration *succeeds* the returned Deferred/Future will *resolve* to
an object that implements :class:`autobahn.wamp.interfaces.IRegistration`.
If the registration fails the Deferred/Future will reject with an instance
of :class:`autobahn.wamp.exception.ApplicationError`.
- If the registration *fails* the returned Deferred/Future will *reject* with an
instance of :class:`autobahn.wamp.exception.ApplicationError`.
If ``endpoint`` is an object, then each of the object's methods that are decorated
with :func:`autobahn.wamp.register` are registered as procedure endpoints, and a list of
When ``endpoint`` is an object, then each of the object's methods that is decorated
with :func:`autobahn.wamp.register` is automatically registered and a list of
Deferreds/Futures is returned that each resolves or rejects as above.
:param endpoint: The endpoint or endpoint object called under the procedure.
:type endpoint: callable
:param procedure: When ``endpoint`` is a single event handler, the URI (or URI pattern)
of the procedure to register for. When ``endpoint`` is an endpoint
object, this value is ignored (and should be ``None``).
:type procedure: str
:param endpoint: The endpoint called under the procedure.
:type endpoint: callable or object
:param procedure: When ``endpoint`` is a callable, the URI (or URI pattern)
of the procedure to register for. When ``endpoint`` is an object,
the argument is ignored (and should be ``None``).
:type procedure: unicode
:param options: Options for registering.
:type options: An instance of :class:`autobahn.wamp.types.RegisterOptions`.
:type options: instance of :class:`autobahn.wamp.types.RegisterOptions`.
:returns: obj -- A (list of) Deferred(s)/Future(s) for the registration(s) -
instance(s) of :class:`twisted.internet.defer.Deferred` (when
running under Twisted) or instance(s) of :class:`asyncio.Future`
(when running under asyncio).
:returns: A registration or a list of registrations (or errors)
:rtype: instance(s) of :tx:`twisted.internet.defer.Deferred` / :py:class:`asyncio.Future`
"""
......@@ -501,26 +512,29 @@ class IPublisher(ISession):
:class:`autobahn.wamp.types.PublishOptions`, this will provide
specific options for the publish to perform.
If publication acknowledgement is requested via ``options.acknowledge == True``,
.. note::
By default, publications are non-acknowledged and the publication can
fail silently, e.g. because the session is not authorized to publish
to the topic.
When publication acknowledgement is requested via ``options.acknowledge == True``,
this function returns a Deferred/Future:
- if the publication succeeds the Deferred/Future will resolve to an object
- If the publication succeeds the Deferred/Future will resolve to an object
that implements :class:`autobahn.wamp.interfaces.IPublication`.
- if the publication fails the Deferred/Future will reject with an instance
- If the publication fails the Deferred/Future will reject with an instance
of :class:`autobahn.wamp.exception.ApplicationError`.
:param topic: The URI of the topic to publish to, e.g. ``"com.myapp.mytopic1"``.
:type topic: str
:param topic: The URI of the topic to publish to, e.g. ``u"com.myapp.mytopic1"``.
:type topic: unicode
:param args: Arbitrary application payload for the event (positional arguments).
:type args: list
:param kwargs: Arbitrary application payload for the event (keyword arguments).
:type kwargs: dict
:returns: obj -- ``None`` for non-acknowledged publications or,
for acknowledged publications, an instance of
:class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
:returns: Acknowledgement for acknowledge publications - otherwise nothing.
:rtype: ``None`` or instance of :tx:`twisted.internet.defer.Deferred` / :py:class:`asyncio.Future`
"""
......@@ -551,18 +565,21 @@ class ISubscription(object):
Unsubscribe this subscription that was previously created from
:func:`autobahn.wamp.interfaces.ISubscriber.subscribe`.
After a subscription has been unsubscribed, events won't get
routed to the handler anymore.
After a subscription has been unsubscribed successfully, no events
will be routed to the event handler anymore.
This will return a Deferred/Future, that when resolved signals
successful unsubscription.
Returns an instance of :tx:`twisted.internet.defer.Deferred` (when
running on **Twisted**) or an instance of :py:class:`asyncio.Future`
(when running on **asyncio**).
If the unsubscription fails, the returned Deferred/Future will be rejected
with an instance of :class:`autobahn.wamp.exception.ApplicationError`.
- If the unsubscription succeeds, the returned Deferred/Future will
*resolve* (with no return value).
:returns: obj -- A Deferred/Future for the unsubscription -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
- If the unsubscription fails, the returned Deferred/Future will *reject*
with an instance of :class:`autobahn.wamp.exception.ApplicationError`.
:returns: A Deferred/Future for the unsubscription
:rtype: instance(s) of :tx:`twisted.internet.defer.Deferred` / :py:class:`asyncio.Future`
"""
......@@ -575,36 +592,34 @@ class ISubscriber(ISession):
@abc.abstractmethod
def subscribe(self, handler, topic = None, options = None):
"""
Subscribe to a topic and subsequently receive events published to that topic.
Subscribe to a topic for receiving events.
If ``handler`` is a callable (function, method or object that implements ``__call__``),
When ``handler`` is a callable (function, method or object that implements ``__call__``),
then `topic` must be provided and an instance of
:class:`twisted.internet.defer.Deferred` (when running on Twisted) or an instance
of :class:`asyncio.Future` (when running on asyncio) is returned.
:tx:`twisted.internet.defer.Deferred` (when running on **Twisted**) or an instance
of :class:`asyncio.Future` (when running on **asyncio**) is returned.
If the subscription succeeds the Deferred/Future will resolve to an object
that implements :class:`autobahn.wamp.interfaces.ISubscription`.
- If the subscription succeeds the Deferred/Future will resolve to an object
that implements :class:`autobahn.wamp.interfaces.ISubscription`.
If the subscription fails the Deferred/Future will reject with an instance
of :class:`autobahn.wamp.exception.ApplicationError`.
- If the subscription fails the Deferred/Future will reject with an instance
of :class:`autobahn.wamp.exception.ApplicationError`.
If ``handler`` is an object, then each of the object's methods that are decorated
with :func:`autobahn.wamp.subscribe` are subscribed as event handlers, and a list of
Deferreds/Futures is returned that each resolves or rejects as above.
When ``handler`` is an object, then each of the object's methods that is decorated
with :func:`autobahn.wamp.subscribe` is automatically subscribed as event handlers,
and a list of Deferreds/Futures is returned that each resolves or rejects as above.
:param handler: The event handler or handler object to receive events.
:type handler: callable or obj
:param topic: When ``handler`` is a single event handler, the URI (or URI pattern)
of the topic to subscribe to. When ``handler`` is an event handler
object, this value is ignored (and should be ``None``).
:type topic: str
:param handler: The event handler to receive events.
:type handler: callable or object
:param topic: When ``handler`` is a callable, the URI (or URI pattern)
of the topic to subscribe to. When ``handler`` is an object, this
value is ignored (and should be ``None``).
:type topic: unicode
:param options: Options for subscribing.
:type options: An instance of :class:`autobahn.wamp.types.SubscribeOptions`.
:returns: obj -- A (list of) Deferred(s)/Future(s) for the subscription(s) -
instance(s) of :class:`twisted.internet.defer.Deferred` (when
running under Twisted) or instance(s) of :class:`asyncio.Future`
(when running under asyncio).
:returns: A single Deferred/Future or a list of such objects
:rtype: instance(s) of :tx:`twisted.internet.defer.Deferred` / :py:class:`asyncio.Future`
"""
......
......@@ -551,7 +551,7 @@ class Challenge(Message):
raise ProtocolError("invalid message length {0} for CHALLENGE".format(len(wmsg)))
method = wmsg[1]
if type(method) != str:
if type(method) != six.text_type:
raise ProtocolError("invalid type {0} for 'method' in CHALLENGE".format(type(method)))
extra = check_or_raise_extra(wmsg[2], "'extra' in CHALLENGE")
......
......@@ -356,7 +356,7 @@ class ApplicationSession(BaseSession):
self.join(self.config.realm)
def join(self, realm):
def join(self, realm, authmethods = None, authid = None):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.join`
"""
......@@ -375,7 +375,7 @@ class ApplicationSession(BaseSession):
role.RoleCalleeFeatures()
]
msg = message.Hello(realm, roles)
msg = message.Hello(realm, roles, authmethods, authid)
self._realm = realm
self._transport.send(msg)
......@@ -396,13 +396,36 @@ class ApplicationSession(BaseSession):
"""
if self._session_id is None:
## the first message MUST be WELCOME
## the first message must be WELCOME, ABORT or CHALLENGE ..
##
if isinstance(msg, message.Welcome):
self._session_id = msg.session
details = SessionDetails(self._realm, self._session_id, msg.authid, msg.authrole, msg.authmethod)
self._as_future(self.onJoin, details)
#self.onJoin(details)
elif isinstance(msg, message.Abort):
## fire callback and close the transport
self.onLeave(types.CloseDetails(msg.reason, msg.message))
elif isinstance(msg, message.Challenge):
challenge = types.Challenge(msg.method, msg.extra)
d = self._as_future(self.onChallenge, challenge)
def success(signature):
reply = message.Authenticate(signature)
self._transport.send(reply)
def error(err):
reply = message.Abort(u"wamp.error.cannot_authenticate", u"{0}".format(err.value))
self._transport.send(reply)
## fire callback and close the transport
self.onLeave(types.CloseDetails(reply.reason, reply.message))
self._add_future_callbacks(d, success, error)
else:
raise ProtocolError("Received {} message, and session is not yet established".format(msg.__class__))
......@@ -737,6 +760,13 @@ class ApplicationSession(BaseSession):
self.onDisconnect()
def onChallenge(self, challenge):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onChallenge`
"""
raise Exception("received authentication challenge, but onChallenge not implemented")
def onJoin(self, details):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onJoin`
......@@ -1281,6 +1311,16 @@ class RouterSession(BaseSession):
self._add_future_callbacks(d, success, failed)
elif isinstance(msg, message.Abort):
## fire callback and close the transport
self.onLeave(types.CloseDetails(msg.reason, msg.message))
self._session_id = None
self._pending_session_id = None
#self._transport.close()
else:
raise ProtocolError("Received {} message, and session is not yet established".format(msg.__class__))
......
......@@ -2236,7 +2236,7 @@ class CallHandler(Handler):
self.proto.onAfterSendCallError(rmsg, call)
if killsession:
self.proto.sendClose(3000, "killing WAMP session upon request by application exception")
self.proto.sendClose(3000, u"killing WAMP session upon request by application exception")
else:
raise Exception("fatal: internal error in CallHandler._sendCallError")
......
......@@ -52,7 +52,7 @@ from autobahn.websocket.interfaces import IWebSocketChannel, \
IWebSocketChannelFrameApi, \
IWebSocketChannelStreamingApi
from autobahn.util import Stopwatch
from autobahn.util import Stopwatch, newid
from autobahn.websocket.utf8validator import Utf8Validator
from autobahn.websocket.xormasker import XorMaskerNull, createXorMasker
from autobahn.websocket.compress import *
......@@ -634,7 +634,10 @@ class WebSocketProtocol:
'echoCloseCodeReason',
'openHandshakeTimeout',
'closeHandshakeTimeout',
'tcpNoDelay']
'tcpNoDelay',
'autoPingInterval',
'autoPingTimeout',
'autoPingSize']
"""
Configuration attributes common to servers and clients.
"""
......@@ -950,6 +953,18 @@ class WebSocketProtocol:
self.factory._log("skipping onCloseHandshakeTimeout since connection is already closed")
def onAutoPingTimeout(self):
"""
When doing automatic ping/pongs to detect broken connection, the peer
did not reply in time to our ping. We drop the connection.
"""
if self.debugCodePaths:
self.factory._log("Auto ping/pong: onAutoPingTimeout fired")
self.autoPingTimeoutCall = None
self.dropConnection(abort = True)
def dropConnection(self, abort = False):
"""
Drop the underlying TCP connection.
......@@ -1180,6 +1195,10 @@ class WebSocketProtocol:
if self.openHandshakeTimeout > 0:
self.openHandshakeTimeoutCall = self.factory._callLater(self.openHandshakeTimeout, self.onOpenHandshakeTimeout)
self.autoPingTimeoutCall = None
self.autoPingPending = None
self.autoPingPendingCall = None
def _connectionLost(self, reason):
"""
......@@ -1195,6 +1214,20 @@ class WebSocketProtocol:
self.serverConnectionDropTimeoutCall.cancel()
self.serverConnectionDropTimeoutCall = None
## cleanup auto ping/pong timers
##
if self.autoPingPendingCall:
if self.debugCodePaths:
self.factory._log("Auto ping/pong: canceling autoPingPendingCall upon lost connection")
self.autoPingPendingCall.cancel()
self.autoPingPendingCall = None
if self.autoPingTimeoutCall:
if self.debugCodePaths:
self.factory._log("Auto ping/pong: canceling autoPingTimeoutCall upon lost connection")
self.autoPingTimeoutCall.cancel()
self.autoPingTimeoutCall = None
self.state = WebSocketProtocol.STATE_CLOSED
if not self.wasClean:
if not self.droppedByMe and self.wasNotCleanReason is None:
......@@ -1930,6 +1963,46 @@ class WebSocketProtocol:
## PONG frame
##
elif self.current_frame.opcode == 10:
## auto ping/pong processing
##
if self.autoPingPending:
try:
p = payload.decode('utf8')
if p == self.autoPingPending:
if self.debugCodePaths:
self.factory._log("Auto ping/pong: received pending pong for auto-ping/pong")
if self.autoPingTimeoutCall:
self.autoPingTimeoutCall.cancel()
self.autoPingPending = None
self.autoPingTimeoutCall = None
if self.autoPingInterval:
def send():
if self.debugCodePaths:
self.factory._log("Auto ping/pong: sending ping auto-ping/pong")
self.autoPingPendingCall = None
self.autoPingPending = newid(self.autoPingSize)
self.sendPing(self.autoPingPending.encode('utf8'))
if self.autoPingTimeout:
if self.debugCodePaths:
self.factory._log("Auto ping/pong: expecting ping in {0} seconds for auto-ping/pong".format(self.autoPingTimeout))
self.autoPingTimeoutCall = self.factory._callLater(self.autoPingTimeout, self.onAutoPingTimeout)
self.autoPingPendingCall = self.factory._callLater(self.autoPingInterval, send)
else:
if self.debugCodePaths:
self.factory._log("Auto ping/pong: received non-pending pong")
except:
if self.debugCodePaths:
self.factory._log("Auto ping/pong: received non-pending pong")
## fire app-level callback
##
self._onPong(payload)
else:
......@@ -3207,6 +3280,15 @@ class WebSocketServerProtocol(WebSocketProtocol):
if self.websocket_version != 0:
self.current_frame = None
## automatic ping/pong
##
if self.autoPingInterval:
self.autoPingPending = newid(self.autoPingSize)
self.sendPing(self.autoPingPending.encode('utf8'))
if self.autoPingTimeout:
self.autoPingTimeoutCall = self.factory._callLater(self.autoPingTimeout, self.onAutoPingTimeout)
## fire handler on derived class
##
if self.trackedTimings:
......@@ -3455,6 +3537,12 @@ class WebSocketServerFactory(WebSocketFactory):
##
self.perMessageCompressionAccept = lambda _: None
## automatic ping/pong ("heartbearting")
##
self.autoPingInterval = 0
self.autoPingTimeout = 0
self.autoPingSize = 4
def setProtocolOptions(self,
versions = None,
......@@ -3472,7 +3560,10 @@ class WebSocketServerFactory(WebSocketFactory):
openHandshakeTimeout = None,
closeHandshakeTimeout = None,
tcpNoDelay = None,
perMessageCompressionAccept = None):
perMessageCompressionAccept = None,
autoPingInterval = None,
autoPingTimeout = None,
autoPingSize = None):
"""
Set WebSocket protocol options used as defaults for new protocol instances.
......@@ -3508,6 +3599,14 @@ class WebSocketServerFactory(WebSocketFactory):
:type tcpNoDelay: bool
:param perMessageCompressionAccept: Acceptor function for offers.
:type perMessageCompressionAccept: callable
:param autoPingInterval: Automatically send WebSocket pings every given seconds. When the peer does not respond
in `autoPingTimeout`, drop the connection. Set to `0` to disable. (default: `0`).
:type autoPingInterval: float or None
:param autoPingTimeout: Wait this many seconds for the peer to respond to automatically sent pings. If the
peer does not respond in time, drop the connection. Set to `0` to disable. (default: `0`).
:type autoPingTimeout: float or None
:param autoPingSize: Payload size for automatic pings/pongs. Must be an integer from `[4, 125]`. (default: `4`).
:type autoPingSize: int
"""
if allowHixie76 is not None and allowHixie76 != self.allowHixie76:
self.allowHixie76 = allowHixie76
......@@ -3563,6 +3662,17 @@ class WebSocketServerFactory(WebSocketFactory):
if perMessageCompressionAccept is not None and perMessageCompressionAccept != self.perMessageCompressionAccept:
self.perMessageCompressionAccept = perMessageCompressionAccept
if autoPingInterval is not None and autoPingInterval != self.autoPingInterval:
self.autoPingInterval = autoPingInterval
if autoPingTimeout is not None and autoPingTimeout != self.autoPingTimeout:
self.autoPingTimeout = autoPingTimeout
if autoPingSize is not None and autoPingSize != self.autoPingSize:
assert(type(autoPingSize) == float or type(autoPingSize) in six.integer_types)
assert(autoPingSize >= 4 and autoPingSize <= 125)
self.autoPingSize = autoPingSize