Class MCollective::Client
In: lib/mcollective/client.rb
Parent: Object

Helpers for writing clients that can talk to agents, do discovery and so forth

Methods

Attributes

discoverer  [RW] 
options  [RW] 
stats  [RW] 

Public Class methods

[Source]

    # File lib/mcollective/client.rb, line 6
 6:     def initialize(configfile)
 7:       @config = Config.instance
 8:       @config.loadconfig(configfile) unless @config.configured
 9: 
10:       @connection = PluginManager["connector_plugin"]
11:       @security = PluginManager["security_plugin"]
12: 
13:       @security.initiated_by = :client
14:       @options = nil
15:       @subscriptions = {}
16: 
17:       @discoverer = Discovery.new(self)
18:       @connection.connect
19:     end

Public Instance methods

Returns the configured main collective if no specific collective is specified as options

[Source]

    # File lib/mcollective/client.rb, line 23
23:     def collective
24:       if @options[:collective].nil?
25:         @config.main_collective
26:       else
27:         @options[:collective]
28:       end
29:     end

Disconnects cleanly from the middleware

[Source]

    # File lib/mcollective/client.rb, line 32
32:     def disconnect
33:       Log.debug("Disconnecting from the middleware")
34:       @connection.disconnect
35:     end

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

[Source]

     # File lib/mcollective/client.rb, line 115
115:     def discover(filter, timeout, limit=0)
116:       raise "Limit has to be an integer" unless limit.is_a?(Fixnum)
117: 
118:       compount_timeout = timeout_for_compound_filter(@options[:filter]["compound"])
119:       timeout = timeout + compount_timeout
120: 
121:       discovered = @discoverer.discover(filter, timeout, limit)
122:     end

Performs a discovery and then send a request, performs the passed block for each response

   times = discovered_req("status", "mcollectived", options, client) {|resp|
      pp resp
   }

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 187
187:     def discovered_req(body, agent, options=false)
188:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
189: 
190:       options = @options unless options
191: 
192:       compount_timeout = timeout_for_compound_filter(options[:filter]["compound"])
193:       timeout = options[:timeout] + compount_timeout
194: 
195:       STDOUT.sync = true
196: 
197:       print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ")
198: 
199:       begin
200:         discovered_hosts = discover(options[:filter], options[:disctimeout])
201:         discovered = discovered_hosts.size
202:         hosts_responded = []
203:         hosts_not_responded = discovered_hosts
204: 
205:         stat[:discoverytime] = Time.now.to_f - stat[:starttime]
206: 
207:         puts("#{discovered}\n\n")
208:       rescue Interrupt
209:         puts("Discovery interrupted.")
210:         exit!
211:       end
212: 
213:       raise("No matching clients found") if discovered == 0
214: 
215:       begin
216:         Timeout.timeout(timeout) do
217:           reqid = sendreq(body, agent, options[:filter])
218: 
219:           (1..discovered).each do |c|
220:             resp = receive(reqid)
221: 
222:             hosts_responded << resp.payload[:senderid]
223:             hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid])
224: 
225:             yield(resp.payload)
226:           end
227:         end
228:       rescue Interrupt => e
229:       rescue Timeout::Error => e
230:       end
231: 
232:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
233:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
234:       stat[:responses] = hosts_responded.size
235:       stat[:responsesfrom] = hosts_responded
236:       stat[:noresponsefrom] = hosts_not_responded
237:       stat[:discovered] = discovered
238: 
239:       @stats = stat
240:       return stat
241:     end

Prints out the stats returns from req and discovered_req in a nice way

[Source]

     # File lib/mcollective/client.rb, line 265
265:     def display_stats(stats, options=false, caption="stomp call summary")
266:       options = @options unless options
267: 
268:       if options[:verbose]
269:         puts("\n---- #{caption} ----")
270: 
271:         if stats[:discovered]
272:           puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
273:         else
274:           puts("           Nodes: #{stats[:responses]}")
275:         end
276: 
277:         printf("      Start Time: %s\n", Time.at(stats[:starttime]))
278:         printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
279:         printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
280:         printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
281: 
282:       else
283:         if stats[:discovered]
284:           printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
285:         else
286:           printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
287:         end
288:       end
289: 
290:       if stats[:noresponsefrom].size > 0
291:         puts("\nNo response from:\n")
292: 
293:         stats[:noresponsefrom].each do |c|
294:           puts if c % 4 == 1
295:           printf("%30s", c)
296:         end
297: 
298:         puts
299:       end
300:     end

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

[Source]

     # File lib/mcollective/client.rb, line 85
 85:     def receive(requestid = nil)
 86:       reply = nil
 87: 
 88:       begin
 89:         reply = @connection.receive
 90:         reply.type = :reply
 91:         reply.expected_msgid = requestid
 92: 
 93:         reply.decode!
 94: 
 95:         reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
 96: 
 97:         raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
 98:       rescue SecurityValidationFailed => e
 99:         Log.warn("Ignoring a message that did not pass security validations")
100:         retry
101:       rescue MsgDoesNotMatchRequestID => e
102:         Log.debug("Ignoring a message for some other client")
103:         retry
104:       end
105: 
106:       reply
107:     end

Send a request, performs the passed block for each response

times = req("status", "mcollectived", options, client) {|resp|

  pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 132
132:     def req(body, agent=nil, options=false, waitfor=0)
133:       if body.is_a?(Message)
134:         agent = body.agent
135:         options = body.options
136:         waitfor = body.discovered_hosts.size || 0
137:       end
138: 
139:       stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
140: 
141:       options = @options unless options
142: 
143:       compount_timeout = timeout_for_compound_filter(options[:filter]["compound"])
144:       timeout = options[:timeout] + compount_timeout
145: 
146:       STDOUT.sync = true
147: 
148:       hosts_responded = 0
149: 
150:       begin
151:         Timeout.timeout(timeout) do
152:           reqid = sendreq(body, agent, options[:filter])
153: 
154:           loop do
155:             resp = receive(reqid)
156: 
157:             hosts_responded += 1
158: 
159:             yield(resp.payload)
160: 
161:             break if (waitfor != 0 && hosts_responded >= waitfor)
162:           end
163:         end
164:       rescue Interrupt => e
165:       rescue Timeout::Error => e
166:       ensure
167:         unsubscribe(agent, :reply)
168:       end
169: 
170:       stat[:totaltime] = Time.now.to_f - stat[:starttime]
171:       stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
172:       stat[:responses] = hosts_responded
173:       stat[:noresponsefrom] = []
174: 
175:       @stats = stat
176:       return stat
177:     end

Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses

[Source]

    # File lib/mcollective/client.rb, line 39
39:     def sendreq(msg, agent, filter = {})
40:       if msg.is_a?(Message)
41:         request = msg
42:         agent = request.agent
43:       else
44:         ttl = @options[:ttl] || @config.ttl
45:         request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
46:         request.reply_to = @options[:reply_to] if @options[:reply_to]
47:       end
48: 
49:       request.encode!
50: 
51:       Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
52: 
53:       subscribe(agent, :reply)
54: 
55:       request.publish
56: 
57:       request.requestid
58:     end

[Source]

    # File lib/mcollective/client.rb, line 60
60:     def subscribe(agent, type)
61:       unless @subscriptions.include?(agent)
62:         subscription = Util.make_subscriptions(agent, type, collective)
63:         Log.debug("Subscribing to #{type} target for agent #{agent}")
64: 
65:         Util.subscribe(subscription)
66:         @subscriptions[agent] = 1
67:       end
68:     end

if a compound filter is specified and it has any function then we read the DDL for each of those plugins and sum up the timeout declared in the DDL

[Source]

     # File lib/mcollective/client.rb, line 246
246:     def timeout_for_compound_filter(compound_filter)
247:       return 0 if compound_filter.nil? || compound_filter.empty?
248: 
249:       timeout = 0
250: 
251:       compound_filter.each do |filter|
252:         filter.each do |statement|
253:           if statement["fstatement"]
254:             pluginname = Data.pluginname(statement["fstatement"]["name"])
255:             ddl = DDL.new(pluginname, :data)
256:             timeout += ddl.meta[:timeout]
257:           end
258:         end
259:       end
260: 
261:       timeout
262:     end

[Source]

    # File lib/mcollective/client.rb, line 70
70:     def unsubscribe(agent, type)
71:       if @subscriptions.include?(agent)
72:         subscription = Util.make_subscriptions(agent, type, collective)
73:         Log.debug("Unsubscribing #{type} target for #{agent}")
74: 
75:         Util.unsubscribe(subscription)
76:         @subscriptions.delete(agent)
77:       end
78:     end

[Validate]