connection.py 33.6 KB
Newer Older
1 2 3 4
#    Licensed to the Apache Software Foundation (ASF) under one
#    or more contributor license agreements.  See the NOTICE file
#    distributed with this work for additional information
#    regarding copyright ownership.
5
#
6 7 8
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
9
#
10
#         http://www.apache.org/licenses/LICENSE-2.0
11
#
12 13 14 15 16
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
17

Kenneth Giusti's avatar
Kenneth Giusti committed
18 19 20
__all__ = [
    "ConnectionEventHandler",
    "Connection"
21
]
Kenneth Giusti's avatar
Kenneth Giusti committed
22

23
import heapq
24
import logging
Kenneth Giusti's avatar
Kenneth Giusti committed
25
import proton
26
import warnings
Kenneth Giusti's avatar
Kenneth Giusti committed
27

28
from pyngus.endpoint import Endpoint
29
from pyngus.link import _Link
30
from pyngus.link import _SessionProxy
31

Kenneth Giusti's avatar
Kenneth Giusti committed
32 33
LOG = logging.getLogger(__name__)

34 35 36
_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
                   int(getattr(proton, "VERSION_MINOR", 0)))

Kenneth Giusti's avatar
Kenneth Giusti committed
37

Kenneth Giusti's avatar
Kenneth Giusti committed
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
class _CallbackLock(object):
    """A utility class for detecting when a callback invokes a non-reentrant
    Pyngus method.
    """
    def __init__(self):
        super(_CallbackLock, self).__init__()
        self.in_callback = 0

    def __enter__(self):
        self.in_callback += 1
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.in_callback -= 1
        # if a call is made to a non-reentrant method while this context is
        # held, then the method will raise a RuntimeError().  Return false to
        # propagate the exception to the caller
        return False
56

Kenneth Giusti's avatar
Kenneth Giusti committed
57

58
class ConnectionEventHandler(object):
59
    """An implementation of an AMQP 1.0 Connection."""
60
    def connection_active(self, connection):
61
        """Connection handshake has completed."""
Kenneth Giusti's avatar
Kenneth Giusti committed
62
        LOG.debug("connection_active (ignored)")
63

64 65
    def connection_failed(self, connection, error):
        """Connection's transport has failed in some way."""
66
        LOG.warn("connection_failed, error=%s (ignored)", str(error))
67

68
    def connection_remote_closed(self, connection, pn_condition):
69
        """Peer has closed its end of the connection."""
70 71 72
        LOG.debug("connection_remote_closed (ignored)")

    def connection_closed(self, connection):
73
        """The connection has cleanly closed."""
Kenneth Giusti's avatar
Kenneth Giusti committed
74
        LOG.debug("connection_closed (ignored)")
75

Kenneth Giusti's avatar
Kenneth Giusti committed
76
    def sender_requested(self, connection, link_handle,
77
                         name, requested_source,
78
                         properties):
79
        """Peer has requested a SenderLink be created."""
80 81
        # call accept_sender to accept new link,
        # reject_sender to reject it.
Kenneth Giusti's avatar
Kenneth Giusti committed
82
        LOG.debug("sender_requested (ignored)")
83

Kenneth Giusti's avatar
Kenneth Giusti committed
84
    def receiver_requested(self, connection, link_handle,
85
                           name, requested_target,
86 87 88 89
                           properties):
        """Peer has requested a ReceiverLink be created."""
        # call accept_receiver to accept new link,
        # reject_receiver to reject it.
Kenneth Giusti's avatar
Kenneth Giusti committed
90
        LOG.debug("receiver_requested (ignored)")
91

92
    # No longer supported by proton >= 0.10, so this method is deprecated
Kenneth Giusti's avatar
Kenneth Giusti committed
93
    def sasl_step(self, connection, pn_sasl):
94
        """DEPRECATED"""
Kenneth Giusti's avatar
Kenneth Giusti committed
95
        LOG.debug("sasl_step (ignored)")
Kenneth Giusti's avatar
Kenneth Giusti committed
96

97
    def sasl_done(self, connection, pn_sasl, result):
98
        """SASL exchange complete."""
Kenneth Giusti's avatar
Kenneth Giusti committed
99
        LOG.debug("sasl_done (ignored)")
100

101

Kenneth Giusti's avatar
Kenneth Giusti committed
102
class Connection(Endpoint):
103
    """A Connection to a peer."""
104 105
    EOS = -1   # indicates 'I/O stream closed'

106 107
    # set of all SASL connection configuration properties
    _SASL_PROPS = set(['x-username', 'x-password', 'x-require-auth',
108
                       'x-sasl-mechs', 'x-sasl-config-dir',
109
                       'x-sasl-config-name', 'x-force-sasl'])
110

Kenneth Giusti's avatar
Kenneth Giusti committed
111 112 113 114 115
    def _not_reentrant(func):
        """Decorator that prevents callbacks from calling into methods that are
        not reentrant
        """
        def wrap(self, *args, **kws):
116
            if self._callback_lock and self._callback_lock.in_callback:
Kenneth Giusti's avatar
Kenneth Giusti committed
117 118 119 120 121
                m = "Connection %s cannot be invoked from a callback!" % func
                raise RuntimeError(m)
            return func(self, *args, **kws)
        return wrap

122
    def __init__(self, container, name, event_handler=None, properties=None):
123 124
        """Create a new connection from the Container

125 126
        properties: map, properties of the new connection. The following keys
        and values are supported:
127

128 129
        idle-time-out: float, time in seconds before an idle link will be
        closed.
130

131 132
        hostname: string, the name of the host to which this connection is
        being made, sent in the Open frame.
133

134 135
        max-frame-size: int, maximum acceptable frame size in bytes.

136 137
        properties: map, proton connection properties sent to the peer.

138 139
        The following custom connection properties are supported:

140 141 142 143 144 145 146 147 148 149 150 151
        x-server: boolean, set this to True to configure the connection as a
        server side connection.  This should be set True if the connection was
        remotely initiated (e.g. accept on a listening socket).  If the
        connection was locally initiated (e.g. by calling connect()), then this
        value should be set to False.  This setting is used by authentication
        and encryption to configure the connection's role.  The default value
        is False for client mode.

        x-username: string, the client's username to use when authenticating
        with a server.

        x-password: string, the client's password, used for authentication.
152

153 154 155
        x-require-auth: boolean, reject remotely-initiated client connections
        that fail to provide valid credentials for authentication.

156
        x-sasl-mechs: string, a space-separated list of mechanisms
157
        that are allowed for authentication.  Defaults to "ANONYMOUS"
158

159 160 161 162 163 164
        x-sasl-config-dir: string, path to the directory containing the Cyrus
        SASL server configuration.

        x-sasl-config-name: string, name of the Cyrus SASL configuration file
        contained in the x-sasl-config-dir (without the '.conf' suffix)

165 166 167 168 169 170 171
        x-force-sasl: by default SASL authentication is disabled.  SASL will be
        enabled if any of the above x-sasl-* options are set. For clients using
        GSSAPI it is likely none of these options will be set.  In order for
        these clients to authenticate this flag must be set true.  The value of
        this property is ignored if any of the other SASL related properties
        are set.

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
        x-ssl-identity: tuple, contains identifying certificate information
        which will be presented to the peer.  The first item in the tuple is
        the path to the certificate file (PEM format).  The second item is the
        path to a file containing the private key used to sign the certificate
        (PEM format, optional if private key is stored in the certificate
        itself). The last item is the password used to encrypt the private key
        (string, not required if private key is not encrypted)

        x-ssl-ca-file: string, path to a file containing the certificates of
        the trusted Certificate Authorities that will be used to check the
        signature of the peer's certificate.

        x-ssl-verify-mode: string, configure the level of security provided by
        SSL.  Possible values:
            "verify-peer" (default) - most secure, requires peer to supply a
            certificate signed by a valid CA (see x-ssl-ca-file), and check
            the CN or SAN entry in the certificate against the expected
189
            peer hostname (see hostname and x-ssl-peer-name properties)
190 191 192 193 194 195
            "verify-cert" (default if no x-ssl-peer-name given) - like
            verify-peer, but skips the check of the peer hostname.
             Vulnerable to man-in-the-middle attack.
            "no-verify" - do not require the peer to provide a certificate.
            Results in a weaker encryption stream, and other vulnerabilities.

196 197 198
        x-ssl-peer-name: string, DNS name of peer.  Override the hostname used
        to authenticate peer's certificate (see x-ssl-verify-mode).  The value
        of the 'hostname' property is used if this property is not supplied.
199 200 201 202

        x-ssl-allow-cleartext: boolean, Allows clients to connect without using
        SSL (eg, plain TCP). Used by a server that will accept clients
        requesting either trusted or untrusted connections.
203 204 205

        x-trace-protocol: boolean, if true, dump sent and received frames to
        stdout.
206
        """
207
        super(Connection, self).__init__(name)
208
        self._transport_bound = False
209
        self._container = container
210
        self._handler = event_handler
211 212 213
        self._properties = properties or {}
        old_flag = self._properties.get('x-ssl-server', False)
        self._server = self._properties.get('x-server', old_flag)
214

215 216
        self._pn_connection = proton.Connection()
        self._pn_connection.container = container.name
217 218 219 220 221 222 223 224
        if (_PROTON_VERSION < (0, 9)):
            self._pn_transport = proton.Transport()
        else:
            if self._server:
                mode = proton.Transport.SERVER
            else:
                mode = proton.Transport.CLIENT
            self._pn_transport = proton.Transport(mode)
Kenneth Giusti's avatar
Kenneth Giusti committed
225 226
        self._pn_collector = proton.Collector()
        self._pn_connection.collect(self._pn_collector)
227

228 229 230 231 232 233 234 235 236 237 238 239
        if 'hostname' in self._properties:
            self._pn_connection.hostname = self._properties['hostname']
        secs = self._properties.get("idle-time-out")
        if secs:
            self._pn_transport.idle_timeout = secs
        max_frame = self._properties.get("max-frame-size")
        if max_frame:
            self._pn_transport.max_frame_size = max_frame
        if 'properties' in self._properties:
            self._pn_connection.properties = self._properties["properties"]
        if self._properties.get("x-trace-protocol"):
            self._pn_transport.trace(proton.Transport.TRACE_FRM)
240

241
        # indexed by link-name
242 243
        self._sender_links = {}    # SenderLink
        self._receiver_links = {}  # ReceiverLink
244

245 246 247
        self._timers = {}  # indexed by expiration date
        self._timers_heap = []  # sorted by expiration date

248 249
        self._read_done = False
        self._write_done = False
250
        self._error = None
251
        self._next_deadline = 0
252
        self._user_context = None
253
        self._remote_session_id = 0
Kenneth Giusti's avatar
Kenneth Giusti committed
254
        self._callback_lock = _CallbackLock()
255

Kenneth Giusti's avatar
Kenneth Giusti committed
256
        self._pn_sasl = None
257
        self._sasl_done = False
258

259 260 261 262 263
        # if x-force-sasl is false remove it so it does not trigger the SASL
        # configuration logic below
        if not self._properties.get('x-force-sasl', True):
            del self._properties['x-force-sasl']

264 265 266 267
        if self._SASL_PROPS.intersection(set(self._properties.keys())):
            # SASL config specified, need to enable SASL
            if (_PROTON_VERSION < (0, 10)):
                # best effort map of 0.10 sasl config to pre-0.10 sasl
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
                if self._server:
                    self.pn_sasl.server()
                    if 'x-require-auth' in self._properties:
                        if not self._properties['x-require-auth']:
                            if _PROTON_VERSION >= (0, 8):
                                self.pn_sasl.allow_skip()
                else:
                    if 'x-username' in self._properties:
                        self.pn_sasl.plain(self._properties['x-username'],
                                           self._properties.get('x-password',
                                                                ''))
                    else:
                        self.pn_sasl.client()
                mechs = self._properties.get('x-sasl-mechs')
                if mechs:
                    self.pn_sasl.mechanisms(mechs)
284
            else:
285 286 287 288
                # new Proton SASL configuration:
                # maintain old behavior: allow PLAIN and ANONYMOUS
                # authentication.  Override this using x-sasl-mechs below:
                self.pn_sasl.allow_insecure_mechs = True
289 290 291 292 293 294
                if 'x-require-auth' in self._properties:
                    ra = self._properties['x-require-auth']
                    self._pn_transport.require_auth(ra)
                if 'x-username' in self._properties:
                    self._pn_connection.user = self._properties['x-username']
                if 'x-password' in self._properties:
Kenneth Giusti's avatar
Kenneth Giusti committed
295 296
                    self._pn_connection.password = \
                        self._properties['x-password']
297
                if 'x-sasl-mechs' in self._properties:
Kenneth Giusti's avatar
Kenneth Giusti committed
298 299 300 301
                    mechs = self._properties['x-sasl-mechs'].upper()
                    self.pn_sasl.allowed_mechs(mechs)
                    if 'PLAIN' not in mechs and 'ANONYMOUS' not in mechs:
                        self.pn_sasl.allow_insecure_mechs = False
302 303 304 305 306 307
                if 'x-sasl-config-dir' in self._properties:
                    self.pn_sasl.config_path(
                        self._properties['x-sasl-config-dir'])
                if 'x-sasl-config-name' in self._properties:
                    self.pn_sasl.config_name(
                        self._properties['x-sasl-config-name'])
308

309 310 311 312 313 314 315
        # intercept any SSL failures and cleanup resources before propagating
        # the exception:
        try:
            self._pn_ssl = self._configure_ssl(properties)
        except:
            self.destroy()
            raise
316

Kenneth Giusti's avatar
Kenneth Giusti committed
317 318 319 320
    @property
    def container(self):
        return self._container

321
    @property
322
    # TODO(kgiusti) - hopefully remove
323
    def pn_transport(self):
324 325 326
        return self._pn_transport

    @property
327
    # TODO(kgiusti) - hopefully remove
328
    def pn_connection(self):
329 330 331 332 333 334
        return self._pn_connection

    @property
    def name(self):
        return self._name

335 336 337 338 339 340 341
    @property
    def remote_container(self):
        """Return the name of the remote container. Should be present once the
        connection is active.
        """
        return self._pn_connection.remote_container

342 343 344 345 346 347 348
    @property
    def remote_hostname(self):
        """Return the hostname advertised by the remote, if present."""
        if self._pn_connection:
            return self._pn_connection.remote_hostname
        return None

349 350 351 352 353 354 355
    @property
    def remote_properties(self):
        """Properties provided by the peer."""
        if self._pn_connection:
            return self._pn_connection.remote_properties
        return None

356 357
    @property
    def pn_sasl(self):
Kenneth Giusti's avatar
Kenneth Giusti committed
358 359
        if not self._pn_sasl:
            self._pn_sasl = self._pn_transport.sasl()
Kenneth Giusti's avatar
Kenneth Giusti committed
360
        return self._pn_sasl
361

362
    def pn_ssl(self):
363
        """Return the Proton SSL context for this Connection."""
364 365
        return self._pn_ssl

366 367 368 369 370 371
    def _get_user_context(self):
        return self._user_context

    def _set_user_context(self, ctxt):
        self._user_context = ctxt

372
    _uc_docstr = """Associate an arbitrary user object with this Connection."""
373
    user_context = property(_get_user_context, _set_user_context,
374
                            doc=_uc_docstr)
375

376
    def open(self):
377 378 379
        if not self._transport_bound:
            self._pn_transport.bind(self._pn_connection)
            self._transport_bound = True
380 381
        if self._pn_connection.state & proton.Endpoint.LOCAL_UNINIT:
            self._pn_connection.open()
382

383
    def close(self, pn_condition=None):
384
        for link in list(self._sender_links.values()):
385
            link.close(pn_condition)
386
        for link in list(self._receiver_links.values()):
387 388 389
            link.close(pn_condition)
        if pn_condition:
            self._pn_connection.condition = pn_condition
390 391
        if self._pn_connection.state & proton.Endpoint.LOCAL_ACTIVE:
            self._pn_connection.close()
392

393 394 395
    @property
    def active(self):
        """Return True if both ends of the Connection are open."""
Kenneth Giusti's avatar
Kenneth Giusti committed
396
        return self._endpoint_state == self._ACTIVE
397

398 399
    @property
    def closed(self):
Kenneth Giusti's avatar
Kenneth Giusti committed
400 401
        """Return True if the Connection has finished closing."""
        return (self._write_done and self._read_done)
402

Kenneth Giusti's avatar
Kenneth Giusti committed
403
    @_not_reentrant
404
    def destroy(self):
Kenneth Giusti's avatar
Kenneth Giusti committed
405 406 407 408 409
        # if a connection is destroyed without flushing pending output,
        # the remote will see an unclean shutdown (framing error)
        if self.has_output > 0:
            LOG.debug("Connection with buffered output destroyed")
        self._error = "Destroyed by the application"
410
        self._handler = None
411 412 413 414 415 416 417 418 419
        self._properties = None
        tmp = self._sender_links.copy()
        for l in tmp.values():
            l.destroy()
        assert(len(self._sender_links) == 0)
        tmp = self._receiver_links.copy()
        for l in tmp.values():
            l.destroy()
        assert(len(self._receiver_links) == 0)
Kenneth Giusti's avatar
Kenneth Giusti committed
420
        self._timers.clear()
421
        self._timers_heap = None
422
        self._container.remove_connection(self._name)
423 424
        self._container = None
        self._user_context = None
425
        self._callback_lock = None
426 427
        if self._transport_bound:
            self._pn_transport.unbind()
428
        self._pn_transport = None
429
        self._pn_connection.free()
430
        self._pn_connection = None
431 432 433 434
        if _PROTON_VERSION < (0, 8):
            # memory leak: drain the collector before releasing it
            while self._pn_collector.peek():
                self._pn_collector.pop()
435
        self._pn_collector = None
436 437
        self._pn_sasl = None
        self._pn_ssl = None
438

439
    _CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED)
Kenneth Giusti's avatar
Kenneth Giusti committed
440
    _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE)
441

Kenneth Giusti's avatar
Kenneth Giusti committed
442
    @_not_reentrant
443
    def process(self, now):
444
        """Perform connection state processing."""
Kenneth Giusti's avatar
Kenneth Giusti committed
445 446 447
        if self._pn_connection is None:
            LOG.error("Connection.process() called on destroyed connection!")
            return 0
448

Kenneth Giusti's avatar
Kenneth Giusti committed
449 450 451
        # do nothing until the connection has been opened
        if self._pn_connection.state & proton.Endpoint.LOCAL_UNINIT:
            return 0
452

Kenneth Giusti's avatar
Kenneth Giusti committed
453 454
        if self._pn_sasl and not self._sasl_done:
            # wait until SASL has authenticated
455
            if (_PROTON_VERSION < (0, 10)):
Kenneth Giusti's avatar
Kenneth Giusti committed
456 457 458 459 460 461
                if self._pn_sasl.state not in (proton.SASL.STATE_PASS,
                                               proton.SASL.STATE_FAIL):
                    LOG.debug("SASL in progress. State=%s",
                              str(self._pn_sasl.state))
                    if self._handler:
                        with self._callback_lock:
462
                            self._handler.sasl_step(self, self._pn_sasl)
Kenneth Giusti's avatar
Kenneth Giusti committed
463
                    return self._next_deadline
464

Kenneth Giusti's avatar
Kenneth Giusti committed
465 466 467
                self._sasl_done = True
                if self._handler:
                    with self._callback_lock:
468 469
                        self._handler.sasl_done(self, self._pn_sasl,
                                                self._pn_sasl.outcome)
470
            else:
Kenneth Giusti's avatar
Kenneth Giusti committed
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
                if self._pn_sasl.outcome is not None:
                    self._sasl_done = True
                    if self._handler:
                        with self._callback_lock:
                            self._handler.sasl_done(self, self._pn_sasl,
                                                    self._pn_sasl.outcome)

        # process timer events:
        timer_deadline = self._expire_timers(now)
        transport_deadline = self._pn_transport.tick(now)
        if timer_deadline and transport_deadline:
            self._next_deadline = min(timer_deadline, transport_deadline)
        else:
            self._next_deadline = timer_deadline or transport_deadline

        # process events from proton:
        pn_event = self._pn_collector.peek()
        while pn_event:
489
            # LOG.debug("pn_event: %s received", pn_event.type)
490
            if _Link._handle_proton_event(pn_event, self):
Kenneth Giusti's avatar
Kenneth Giusti committed
491
                pass
492
            elif self._handle_proton_event(pn_event):
Kenneth Giusti's avatar
Kenneth Giusti committed
493
                pass
494
            elif _SessionProxy._handle_proton_event(pn_event, self):
Kenneth Giusti's avatar
Kenneth Giusti committed
495 496
                pass
            self._pn_collector.pop()
Kenneth Giusti's avatar
Kenneth Giusti committed
497 498
            pn_event = self._pn_collector.peek()

Kenneth Giusti's avatar
Kenneth Giusti committed
499 500 501 502 503 504 505
        # check for connection failure after processing all pending
        # engine events:
        if self._error:
            if self._handler:
                # nag application until connection is destroyed
                self._next_deadline = now
                with self._callback_lock:
506
                    self._handler.connection_failed(self, self._error)
Kenneth Giusti's avatar
Kenneth Giusti committed
507 508 509 510 511 512
        elif (self._endpoint_state == self._CLOSED and
              self._read_done and self._write_done):
            # invoke closed callback after endpoint has fully closed and
            # all pending I/O has completed:
            if self._handler:
                with self._callback_lock:
513
                    self._handler.connection_closed(self)
514

Kenneth Giusti's avatar
Kenneth Giusti committed
515
        return self._next_deadline
516 517 518

    @property
    def next_tick(self):
519 520
        text = "next_tick deprecated, use deadline instead"
        warnings.warn(DeprecationWarning(text))
521 522 523 524 525 526
        return self.deadline

    @property
    def deadline(self):
        """Must invoke process() on or before this timestamp."""
        return self._next_deadline
527 528 529 530

    @property
    def needs_input(self):
        if self._read_done:
Kenneth Giusti's avatar
Kenneth Giusti committed
531
            LOG.debug("needs_input EOS")
532
            return self.EOS
533 534
        try:
            capacity = self._pn_transport.capacity()
535
        except Exception as e:
Kenneth Giusti's avatar
Kenneth Giusti committed
536 537 538
            self._read_done = True
            self._connection_failed(str(e))
            return self.EOS
539 540
        if capacity >= 0:
            return capacity
Kenneth Giusti's avatar
Kenneth Giusti committed
541
        LOG.debug("needs_input read done")
542 543 544 545
        self._read_done = True
        return self.EOS

    def process_input(self, in_data):
546
        c = min(self.needs_input, len(in_data))
547 548
        if c <= 0:
            return c
549 550 551
        try:
            rc = self._pn_transport.push(in_data[:c])
        except Exception as e:
Kenneth Giusti's avatar
Kenneth Giusti committed
552 553 554
            self._read_done = True
            self._connection_failed(str(e))
            return self.EOS
555
        if rc:  # error?
Kenneth Giusti's avatar
Kenneth Giusti committed
556
            LOG.debug("process_input read done")
557 558
            self._read_done = True
            return self.EOS
Kenneth Giusti's avatar
Kenneth Giusti committed
559 560 561 562
        # hack: check if this was the last input needed by the connection.
        # If so, this will set the _read_done flag and the 'connection closed'
        # callback can be issued on the next call to process()
        self.needs_input
563 564 565 566
        return c

    def close_input(self, reason=None):
        if not self._read_done:
567 568
            try:
                self._pn_transport.close_tail()
569 570
            except Exception as e:
                self._connection_failed(str(e))
Kenneth Giusti's avatar
Kenneth Giusti committed
571
            LOG.debug("close_input read done")
572
            self._read_done = True
573 574 575 576

    @property
    def has_output(self):
        if self._write_done:
Kenneth Giusti's avatar
Kenneth Giusti committed
577
            LOG.debug("has output EOS")
578
            return self.EOS
579 580
        try:
            pending = self._pn_transport.pending()
581
        except Exception as e:
Kenneth Giusti's avatar
Kenneth Giusti committed
582 583 584
            self._write_done = True
            self._connection_failed(str(e))
            return self.EOS
585 586
        if pending >= 0:
            return pending
Kenneth Giusti's avatar
Kenneth Giusti committed
587
        LOG.debug("has output write_done")
588 589 590 591
        self._write_done = True
        return self.EOS

    def output_data(self):
592 593
        """Get a buffer of data that needs to be written to the network.
        """
594 595 596
        c = self.has_output
        if c <= 0:
            return None
597 598 599
        try:
            buf = self._pn_transport.peek(c)
        except Exception as e:
600 601 602
            self._connection_failed(str(e))
            return None
        return buf
603 604

    def output_written(self, count):
605 606 607
        try:
            self._pn_transport.pop(count)
        except Exception as e:
Kenneth Giusti's avatar
Kenneth Giusti committed
608 609
            self._write_done = True
            self._connection_failed(str(e))
Kenneth Giusti's avatar
Kenneth Giusti committed
610 611 612 613
        # hack: check if this was the last output from the connection.  If so,
        # this will set the _write_done flag and the 'connection closed'
        # callback can be issued on the next call to process()
        self.has_output
614 615 616

    def close_output(self, reason=None):
        if not self._write_done:
617 618
            try:
                self._pn_transport.close_head()
619 620
            except Exception as e:
                self._connection_failed(str(e))
Kenneth Giusti's avatar
Kenneth Giusti committed
621
            LOG.debug("close output write done")
622
            self._write_done = True
623 624

    def create_sender(self, source_address, target_address=None,
625
                      event_handler=None, name=None, properties=None):
626
        """Factory method for Sender links."""
627 628 629 630
        ident = name or str(source_address)
        if ident in self._sender_links:
            raise KeyError("Sender %s already exists!" % ident)

631
        session = _SessionProxy("session-%s" % ident, self)
632 633
        session.open()
        sl = session.new_sender(ident)
634 635 636
        sl.configure(target_address, source_address, event_handler, properties)
        self._sender_links[ident] = sl
        return sl
637

638
    def accept_sender(self, link_handle, source_override=None,
639
                      event_handler=None, properties=None):
640 641
        link = self._sender_links.get(link_handle)
        if not link:
642
            raise Exception("Invalid link_handle: %s" % link_handle)
643
        pn_link = link._pn_link
Kenneth Giusti's avatar
Kenneth Giusti committed
644
        if pn_link.remote_source.dynamic and not source_override:
645 646
            raise Exception("A source address must be supplied!")
        source_addr = source_override or pn_link.remote_source.address
647 648 649
        link.configure(pn_link.remote_target.address,
                       source_addr,
                       event_handler, properties)
650
        return link
651

652
    def reject_sender(self, link_handle, pn_condition=None):
653 654 655
        """Rejects the SenderLink, and destroys the handle."""
        link = self._sender_links.get(link_handle)
        if not link:
656
            raise Exception("Invalid link_handle: %s" % link_handle)
657
        link.reject(pn_condition)
Kenneth Giusti's avatar
Kenneth Giusti committed
658 659 660
        # note: normally, link.destroy() cannot be called from a callback,
        # but this link was never made available to the application so this
        # link is only referenced by the connection
661
        link.destroy()
662

663
    def create_receiver(self, target_address, source_address=None,
664
                        event_handler=None, name=None, properties=None):
665
        """Factory method for creating Receive links."""
666 667 668 669
        ident = name or str(target_address)
        if ident in self._receiver_links:
            raise KeyError("Receiver %s already exists!" % ident)

670
        session = _SessionProxy("session-%s" % ident, self)
671 672
        session.open()
        rl = session.new_receiver(ident)
673 674 675
        rl.configure(target_address, source_address, event_handler, properties)
        self._receiver_links[ident] = rl
        return rl
676

677
    def accept_receiver(self, link_handle, target_override=None,
678
                        event_handler=None, properties=None):
679 680
        link = self._receiver_links.get(link_handle)
        if not link:
681
            raise Exception("Invalid link_handle: %s" % link_handle)
682
        pn_link = link._pn_link
Kenneth Giusti's avatar
Kenneth Giusti committed
683
        if pn_link.remote_target.dynamic and not target_override:
684 685
            raise Exception("A target address must be supplied!")
        target_addr = target_override or pn_link.remote_target.address
686 687 688
        link.configure(target_addr,
                       pn_link.remote_source.address,
                       event_handler, properties)
689
        return link
690

691
    def reject_receiver(self, link_handle, pn_condition=None):
692 693
        link = self._receiver_links.get(link_handle)
        if not link:
694
            raise Exception("Invalid link_handle: %s" % link_handle)
695
        link.reject(pn_condition)
Kenneth Giusti's avatar
Kenneth Giusti committed
696 697 698
        # note: normally, link.destroy() cannot be called from a callback,
        # but this link was never made available to the application so this
        # link is only referenced by the connection
699
        link.destroy()
700

Kenneth Giusti's avatar
Kenneth Giusti committed
701 702 703 704
    @property
    def _endpoint_state(self):
        return self._pn_connection.state

705 706 707 708 709 710 711
    def _remove_sender(self, name):
        if name in self._sender_links:
            del self._sender_links[name]

    def _remove_receiver(self, name):
        if name in self._receiver_links:
            del self._receiver_links[name]
712

713
    def _connection_failed(self, error="Error not specified!"):
714
        """Clean up after connection failure detected."""
715
        if not self._error:
716
            LOG.error("Connection failed: %s", str(error))
717
            self._error = error
718 719

    def _configure_ssl(self, properties):
720 721
        if not properties:
            return None
722 723 724 725 726
        verify_modes = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
                        'verify-cert': proton.SSLDomain.VERIFY_PEER,
                        'no-verify': proton.SSLDomain.ANONYMOUS_PEER}

        mode = proton.SSLDomain.MODE_CLIENT
727
        if properties.get('x-ssl-server', properties.get('x-server')):
728 729 730 731 732 733
            mode = proton.SSLDomain.MODE_SERVER

        identity = properties.get('x-ssl-identity')
        ca_file = properties.get('x-ssl-ca-file')

        if not identity and not ca_file:
734
            return None  # SSL not configured
735 736 737 738 739 740 741 742 743 744

        hostname = None
        # This will throw proton.SSLUnavailable if SSL support is not installed
        domain = proton.SSLDomain(mode)
        if identity:
            # our identity:
            domain.set_credentials(identity[0], identity[1], identity[2])
        if ca_file:
            # how we verify peers:
            domain.set_trusted_ca_db(ca_file)
745 746
            hostname = properties.get('x-ssl-peer-name',
                                      properties.get('hostname'))
747 748 749 750 751
            vdefault = 'verify-peer' if hostname else 'verify-cert'
            vmode = verify_modes.get(properties.get('x-ssl-verify-mode',
                                                    vdefault))
            # check for configuration error
            if not vmode:
752
                raise proton.SSLException("bad value for x-ssl-verify-mode")
753
            if vmode == proton.SSLDomain.VERIFY_PEER_NAME and not hostname:
754
                raise proton.SSLException("verify-peer needs x-ssl-peer-name")
755 756 757 758 759 760 761
            domain.set_peer_authentication(vmode, ca_file)
        if mode == proton.SSLDomain.MODE_SERVER:
            if properties.get('x-ssl-allow-cleartext'):
                domain.allow_unsecured_client()
        pn_ssl = proton.SSL(self._pn_transport, domain)
        if hostname:
            pn_ssl.peer_hostname = hostname
762
        LOG.debug("SSL configured for connection %s", self._name)
763
        return pn_ssl
764 765 766

    def _add_timer(self, deadline, callback):
        callbacks = self._timers.get(deadline)
Kenneth Giusti's avatar
Kenneth Giusti committed
767
        if callbacks is None:
768 769 770 771 772
            callbacks = set()
            self._timers[deadline] = callbacks
            heapq.heappush(self._timers_heap, deadline)
            if deadline < self._next_deadline:
                self._next_deadline = deadline
Kenneth Giusti's avatar
Kenneth Giusti committed
773
        callbacks.add(callback)
774 775 776 777 778 779 780 781 782 783 784 785

    def _cancel_timer(self, deadline, callback):
        callbacks = self._timers.get(deadline)
        if callbacks:
            callbacks.discard(callback)
        # next expire will discard empty deadlines

    def _expire_timers(self, now):
        while (self._timers_heap and
               self._timers_heap[0] <= now):
            deadline = heapq.heappop(self._timers_heap)
            callbacks = self._timers.get(deadline)
Kenneth Giusti's avatar
Kenneth Giusti committed
786 787
            while callbacks:
                callbacks.pop()()
788 789
            del self._timers[deadline]

790
        return self._timers_heap[0] if self._timers_heap else 0
Kenneth Giusti's avatar
Kenneth Giusti committed
791

792 793 794 795 796
    # Proton's event model was changed after 0.7
    if (_PROTON_VERSION >= (0, 8)):
        _endpoint_event_map = {
            proton.Event.CONNECTION_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
            proton.Event.CONNECTION_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
797 798
            proton.Event.CONNECTION_LOCAL_OPEN: Endpoint.LOCAL_OPENED,
            proton.Event.CONNECTION_LOCAL_CLOSE: Endpoint.LOCAL_CLOSED}
799 800 801 802 803 804 805 806

        def _handle_proton_event(self, pn_event):
            ep_event = Connection._endpoint_event_map.get(pn_event.type)
            if ep_event is not None:
                self._process_endpoint_event(ep_event)
            elif pn_event.type == proton.Event.CONNECTION_INIT:
                LOG.debug("Connection created: %s", pn_event.context)
            elif pn_event.type == proton.Event.CONNECTION_FINAL:
807
                LOG.debug("Connection finalized: %s", pn_event.context)
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
            elif pn_event.type == proton.Event.TRANSPORT_ERROR:
                self._connection_failed(str(self._pn_transport.condition))
            else:
                return False  # unknown
            return True  # handled
    elif hasattr(proton.Event, "CONNECTION_LOCAL_STATE"):
        # 0.7 proton event model
        def _handle_proton_event(self, pn_event):
            if pn_event.type == proton.Event.CONNECTION_LOCAL_STATE:
                self._process_local_state()
            elif pn_event.type == proton.Event.CONNECTION_REMOTE_STATE:
                self._process_remote_state()
            else:
                return False  # unknown
            return True  # handled
    else:
        raise Exception("The installed version of Proton is not supported.")

Kenneth Giusti's avatar
Kenneth Giusti committed
826 827 828 829 830
    # endpoint state machine actions:

    def _ep_active(self):
        """Both ends of the Endpoint have become active."""
        LOG.debug("Connection is up")
831
        if self._handler:
Kenneth Giusti's avatar
Kenneth Giusti committed
832 833
            with self._callback_lock:
                self._handler.connection_active(self)
Kenneth Giusti's avatar
Kenneth Giusti committed
834 835 836 837 838 839

    def _ep_need_close(self):
        """The remote has closed its end of the endpoint."""
        LOG.debug("Connection remotely closed")
        if self._handler:
            cond = self._pn_connection.remote_condition
Kenneth Giusti's avatar
Kenneth Giusti committed
840 841
            with self._callback_lock:
                self._handler.connection_remote_closed(self, cond)
842

843
    def _ep_error(self, error):
844
        """The endpoint state machine failed due to protocol error."""
845
        super(Connection, self)._ep_error(error)
846
        self._connection_failed("Protocol error occurred.")
847 848 849 850 851 852 853 854 855 856 857 858 859 860

    # order by name

    def __lt__(self, other):
        return self.name < other.name

    def __le__(self, other):
        return self < other or self.name == other.name

    def __gt__(self, other):
        return self.name > other.name

    def __ge__(self, other):
        return self > other or self.name == other.name