Python Messaging Example

Objectives

Details about program

There are two separate apps.

  1. An Agent runs on a remote box and responds to requests
    • get_owner - Return the current owner of the box
    • set_owner - Set the current owner of the box
    • get_info - Returns info about the box (using platform uname)
  2. A Beaker command line app that sends the above requests to an agent and displays the responses

Installation

  1. Google search for
    qpid proton
  2. First result is http://qpid.apache.org/proton/
  3. Found Source Repository link.
  4. Installed using svn
  5. Found README and followed instructions.

Notes

This mostly went smoothly except for make install.

As eallen

CMake Error at cmake_install.cmake:36 (FILE):
              file INSTALL cannot copy file "/home/remote/eallen/proton/LICENSE" to
              "/usr/share/proton-0.4/LICENSE".

As root

UNEXPECTED ERROR:
            [Errno 13] Permission denied: '/home/remote/eallen/proton/build/proton-c/bindings/python/html/epydoc.css'
Tried using both
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
and
cmake -DCMAKE_INSTALL_PREFIX=/home/remote/eallen/myproton ..

Development

I quicky found the examples/messenger/py/* applications. Went through the README and send.py and recv.py worked as advertised.

However, since I needed a request/response pattern, I used the client.py and server.py as a starting point.

Design

  • Send command in message subject
  • Send any arguments in message properties
  • Return data in message properties
  • Use built-in reply-to as address of returned message

Notes

The only development problem I encountered was returning a list of values that contained a python None. No values after the None were received.

Output

$ agent.py -v
subscribed to amqp://~0.0.0.0:5678
get_info returned [0, ('Linux', 'redhat', '3.8.13-100.fc17.x86_64', '#1 SMP Mon May 13 13:36:17 UTC 2013', 'x86_64', 'x86_64')]
$ beaker.py 0.0.0.0:5678 get_info
Re: get_info {'result': [0L, ['Linux', 'redhat', '3.8.13-100.fc17.x86_64', '#1 SMP Mon May 13 13:36:17 UTC 2013', 'x86_64', 'x86_64']]}

Code

Agent

#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys, optparse
import platform
import inspect
from proton import *


class System:
    def __init__(self, opts):
        self.owner = None
        self.info = platform.uname()

    def get_owner(self, request):
        _set_property(request, "owner", self.owner)
        return 0, self.owner

    def set_owner(self, request):
        try:
            owner = request.properties.get("owner")
            self.owner = owner
            return 0, owner
        except AttributeError:
            return -1, "message.properties must contain 'owner' when calling set_owner"

    def clear_owner(self, request):
        self.owner = None
        return 0, None

    def get_info(self, request):
        return 0, self.info

    def get_capabilities(self, request):
        methods = inspect.getmembers(self, predicate=inspect.ismethod)
        public = [m[0] for m in methods if not m[0].startswith("_")]
        return 0, public

def _set_property(message, key, value):
    if message.properties is None:
        message.properties = dict()
    message.properties[key] = value


class BeakerAgent():
    def dispatch(self, request, response):
        response.address = request.reply_to
        response.correlation_id = request.correlation_id
        response.body = request.body
        if request.subject:
            response.subject = "Re: %s" % request.subject

    def subscribe(self, messenger, addresses, verbose):
        subscribed = False
        ex = None

        for a in addresses:
            try:
                messenger.subscribe(a)
                subscribed = True
                if verbose:
                    print "subscribed to %s" % a

            except MessengerException, e:
                ex = e
                if verbose:
                    print "failed to subscribe to %s (%s)" % (a, ex.message)

        if not subscribed:
            if ex:
                raise ex
            else:
                raise Exception("not subscribed to an address")

    def parse_cmd_line(self):
        parser = optparse.OptionParser(usage="usage: %prog [options]  ... ",
                                   description="sample proton application")
        parser.add_option("-v", "--verbose",
                          action="store_true", dest="verbose", default=False,
                          help="print log messages to stdout")
        return parser.parse_args()

    def main(self):
        try:
            opts, args = self.parse_cmd_line()

            system = System(opts)

            if not args:
                args = ["amqp://~0.0.0.0:5678"]

            messenger = Messenger()
            messenger.start()

            self.subscribe(messenger, args, opts.verbose)

            request = Message()
            reply = Message()

            running = True
            while running:
                if messenger.incoming < 2:
                    try:
                        messenger.recv(2)
                    except KeyboardInterrupt:
                        running = False
                        if opts.verbose:
                            print "KeyboardInterrupt. Exiting"

                if messenger.incoming > 0:
                    messenger.get(request)
                    if request.reply_to:
                        value = None
                        code = 0
                        method = getattr(system, request.subject, None)
                        if callable(method):
                            code, value = method(request)
                        else:
                            code = -1
                            value = "%s not implemented" % request.subject

                        if opts.verbose:
                            print "%s returned [%d, %s]" % (request.subject, code, value)
                        _set_property(reply, "result", [code, value])
                        self.dispatch(request, reply)
                        messenger.put(reply)
                        messenger.send()

            messenger.stop()
            return 0

        except Exception, e:
            sys.stderr.write("%s\n" % e.message)
            return -1

def daemon_main():
    agent = BeakerAgent()
    sys.exit(agent.main())

if __name__ == "__main__":
    daemon_main()
    

Beaker

#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import getpass
from proton import *

usage = """
Usage:  %prog   
        addr: address of system
        command: get_owner | set_owner | get_info
"""

description = """
Communicates with a beaker-agent (beaker-agent.py) running at an address.

Examples:
$ %prog 0.0.0.0 get_owner => asks the agent at 0.0.0.0 to return its owner 
$ %prog mrg17.lab.bos.redhat.com:5673 get_info => asks the agent at mrg17.lab.bos.redhat.com:5673 to return its system info
$ %prog mrg17.lab.bos.redhat.com:5673 set_owner => tells the agent at mrg17.lab.bos.redhat.com:5673 to set its owner to your username

"""
class HelpFormatter(IndentedHelpFormatter):
    """Format usage and description without stripping newlines from usage strings
    """

    def format_usage(self, usage):
        return usage


    def format_description(self, description):
        if description:
            return description + "\n"
        else:
            return ""

parser = OptionParser(usage=usage, description=description, formatter=HelpFormatter())

parser.add_option("-r", "--reply_to", default="~/replies",
                  help="address: [amqp://][/] (default %default)")

opts, args = parser.parse_args()

if len(args) != 2:
  parser.error("incorrect number of arguments")

address, command = args

mng = Messenger()
mng.start()

msg = Message()
msg.address = address
msg.subject = command
if command == "set_owner":
    user = getpass.getuser()
    msg.properties = dict()
    msg.properties["owner"] = user

msg.reply_to = opts.reply_to

mng.put(msg)
mng.send()

if opts.reply_to[:2] == "~/":
  mng.recv(1)
  try:
    mng.get(msg)
    print msg.subject, msg.properties
  except Exception, e:
    print e

mng.stop()