source: pyutilib.pyro/trunk/pyutilib/pyro/dispatcher.py @ 2831

Revision 2831, 4.6 KB checked in by wehart, 2 years ago (diff)

Python 3.x portability fixes.

Line 
1#  _________________________________________________________________________
2#
3#  PyUtilib: A Python utility library.
4#  Copyright (c) 2008 Sandia Corporation.
5#  This software is distributed under the BSD License.
6#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7#  the U.S. Government retains certain rights in this software.
8#  _________________________________________________________________________
9
10__all__ = ['Dispatcher', 'DispatcherServer']
11
12import Pyro.core
13import Pyro.naming
14from Pyro.errors import NamingError
15from Queue import Queue
16from pyutilib.pyro.util import *
17
18class Dispatcher(Pyro.core.ObjBase):
19
20    def __init__(self, **kwds):
21        Pyro.core.ObjBase.__init__(self)
22        self.default_taskqueue = Queue()
23        self.default_resultqueue = Queue()
24        self.taskqueue = {}
25        self.resultqueue = {}
26        self.verbose = kwds.get("verbose", False)
27        if self.verbose is True:
28           print("Verbose output enabled...")
29
30    def shutdown(self):
31        print("Dispatcher received request to shut down - initiating...")
32        self.getDaemon().shutdown()
33
34    def add_task(self, task, type=None):
35        if self.verbose is True:
36           print("Received request to add task="+str(task)+"; queue type="+str(type))
37        if type is None:
38            self.default_taskqueue.put(task)
39        else:
40            if not type in self.taskqueue:
41                self.taskqueue[type] = Queue()
42            self.taskqueue[type].put(task)
43
44    def get_task(self, type=None, timeout=5):
45        if self.verbose is True:
46           print("Received request for a task from queue type="+str(type)+"; timeout="+str(timeout)+" seconds")
47        if type is None:
48            return self.default_taskqueue.get(block=True, timeout=timeout)
49        else:
50            try:
51                return self.taskqueue[type].get(block=True, timeout=timeout)
52            except KeyError:
53                return None
54
55    def add_result(self, data, type=None):
56        if self.verbose is True:
57           print("Received request to add result with data="+str(data)+"; queue type="+str(type))
58        if type is None:
59            self.default_resultqueue.put(data)
60        else:
61            if not type in self.resultqueue:
62                self.resultqueue[type] = Queue()
63            self.resultqueue[type].put(data)
64
65    def get_result(self, type=None, timeout=5):
66        if self.verbose is True:
67           print("Received request for result with queue type="+str(type)+"; timeout="+str(timeout))
68        if type is None:
69            return self.default_resultqueue.get(block=True, timeout=timeout)
70        else:
71            try:
72                return self.resultqueue[type].get(block=True, timeout=timeout)
73            except KeyError:
74                return None
75
76    def num_tasks(self, type=None):
77        if self.verbose is True:
78           print("Received request for number of tasks in queue with type="+str(type))
79        if type is None:
80            return self.default_taskqueue.qsize()
81        else:
82            try:
83                self.taskqueue[type].get(block=True, timeout=timeout)
84            except KeyError:
85                return 0
86
87    def num_results(self, type=None):
88        if self.verbose is True:
89           print("Received request for number of results in queue with type="+str(type))
90        if type is None:
91            return self.default_resultqueue.qsize()
92        else:
93            try:
94                self.resultqueue[type].get(block=True, timeout=timeout)
95            except KeyError:
96                return 0
97
98    def queues_with_results(self):
99        if self.verbose is True:
100           print("Received request for the set of queues with results")
101        result = set()
102        if self.default_resultqueue.qsize() > 0:
103            result.add(None)
104        for queue_name, result_queue in self.resultqueue.items():
105           if result_queue.qsize() > 0:
106               result.add(queue_name)
107        return result       
108
109def DispatcherServer(group=":PyUtilibServer", host=None, verbose=False):
110    #
111    # main program
112    #
113    ns = get_nameserver(host)
114    if ns is None:
115        return
116
117    daemon=Pyro.core.Daemon()
118    daemon.useNameServer(ns)
119
120    try:
121        ns.createGroup(group)
122    except NamingError:
123        pass
124    try:
125        ns.unregister(group+".dispatcher")
126    except NamingError:
127        pass
128
129    # the default value of 200 on clusters wasn't working well for large jobs.
130    Pyro.config.PYRO_MAXCONNECTIONS = 1000
131
132    uri=daemon.connect(Dispatcher(verbose=verbose),group+".dispatcher")
133
134    print("Dispatcher is ready.")
135    daemon.requestLoop()
Note: See TracBrowser for help on using the repository browser.