Class | MCollective::RPC::Client |
In: |
lib/mcollective/rpc/client.rb
|
Parent: | Object |
The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.
agent | [R] | |
batch_mode | [R] | |
batch_size | [R] | |
batch_sleep_time | [R] | |
client | [R] | |
config | [RW] | |
ddl | [R] | |
discovery_timeout | [RW] | |
filter | [RW] | |
limit_method | [R] | |
limit_targets | [R] | |
output_format | [R] | |
progress | [RW] | |
reply_to | [RW] | |
stats | [R] | |
timeout | [RW] | |
ttl | [RW] | |
verbose | [RW] |
Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.
rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
You typically would not call this directly you‘d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin
# File lib/mcollective/rpc/client.rb, line 19 19: def initialize(agent, flags = {}) 20: if flags.include?(:options) 21: initial_options = flags[:options] 22: 23: elsif @@initial_options 24: initial_options = Marshal.load(@@initial_options) 25: 26: else 27: oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter") 28: 29: initial_options = oparser.parse do |parser, opts| 30: if block_given? 31: yield(parser, opts) 32: end 33: 34: Helpers.add_simplerpc_options(parser, opts) 35: end 36: 37: @@initial_options = Marshal.dump(initial_options) 38: end 39: 40: @stats = Stats.new 41: @agent = agent 42: @discovery_timeout = initial_options[:disctimeout] 43: @timeout = initial_options[:timeout] 44: @verbose = initial_options[:verbose] 45: @filter = initial_options[:filter] 46: @config = initial_options[:config] 47: @discovered_agents = nil 48: @progress = initial_options[:progress_bar] 49: @limit_targets = initial_options[:mcollective_limit_targets] 50: @limit_method = Config.instance.rpclimitmethod 51: @output_format = initial_options[:output_format] || :console 52: @force_direct_request = false 53: @reply_to = initial_options[:reply_to] 54: 55: @batch_size = Integer(initial_options[:batch_size] || 0) 56: @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1) 57: @batch_mode = @batch_size > 0 58: 59: agent_filter agent 60: 61: @client = MCollective::Client.new(@config) 62: @client.options = initial_options 63: 64: @collective = @client.collective 65: @ttl = initial_options[:ttl] || Config.instance.ttl 66: 67: # if we can find a DDL for the service override 68: # the timeout of the client so we always magically 69: # wait appropriate amounts of time. 70: # 71: # We add the discovery timeout to the ddl supplied 72: # timeout as the discovery timeout tends to be tuned 73: # for local network conditions and fact source speed 74: # which would other wise not be accounted for and 75: # some results might get missed. 76: # 77: # We do this only if the timeout is the default 5 78: # seconds, so that users cli overrides will still 79: # get applied 80: begin 81: @ddl = DDL.new(agent) 82: @timeout = @ddl.meta[:timeout] + @discovery_timeout if @timeout == 5 83: rescue Exception => e 84: Log.debug("Could not find DDL: #{e}") 85: @ddl = nil 86: end 87: 88: # allows stderr and stdout to be overridden for testing 89: # but also for web apps that might not want a bunch of stuff 90: # generated to actual file handles 91: if initial_options[:stderr] 92: @stderr = initial_options[:stderr] 93: else 94: @stderr = STDERR 95: @stderr.sync = true 96: end 97: 98: if initial_options[:stdout] 99: @stdout = initial_options[:stdout] 100: else 101: @stdout = STDOUT 102: @stdout.sync = true 103: end 104: end
Sets the agent filter
# File lib/mcollective/rpc/client.rb, line 347 347: def agent_filter(agent) 348: @filter["agent"] << agent 349: @filter["agent"].compact! 350: reset 351: end
Sets the batch size, if the size is set to 0 that will disable batch mode
# File lib/mcollective/rpc/client.rb, line 519 519: def batch_size=(limit) 520: raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing 521: 522: @batch_size = Integer(limit) 523: @batch_mode = @batch_size > 0 524: end
# File lib/mcollective/rpc/client.rb, line 526 526: def batch_sleep_time=(time) 527: raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing 528: 529: @batch_sleep_time = Float(time) 530: end
Sets the class filter
# File lib/mcollective/rpc/client.rb, line 323 323: def class_filter(klass) 324: @filter["cf_class"] << klass 325: @filter["cf_class"].compact! 326: reset 327: end
Sets the collective we are communicating with
# File lib/mcollective/rpc/client.rb, line 487 487: def collective=(c) 488: @collective = c 489: @client.options[:collective] = c 490: end
Set a compound filter
# File lib/mcollective/rpc/client.rb, line 361 361: def compound_filter(filter) 362: @filter["compound"] << Matcher::Parser.new(filter).execution_stack 363: reset 364: end
Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.
This will help you essentially short circuit the traditional cycle of:
mc discover / call / wait for discovered nodes
by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.
Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc
If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:
puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses
If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise
# File lib/mcollective/rpc/client.rb, line 274 274: def custom_request(action, args, expected_agents, filter = {}, &block) 275: @ddl.validate_request(action, args) if @ddl 276: 277: if filter == {} && !Config.instance.direct_addressing 278: raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes" 279: end 280: 281: @stats.reset 282: 283: custom_filter = Util.empty_filter 284: custom_options = options.clone 285: 286: # merge the supplied filter with the standard empty one 287: # we could just use the merge method but I want to be sure 288: # we dont merge in stuff that isnt actually valid 289: ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype| 290: if filter.include?(ftype) 291: custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten 292: end 293: end 294: 295: # ensure that all filters at least restrict the call to the agent we're a proxy for 296: custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent) 297: custom_options[:filter] = custom_filter 298: 299: # Fake out the stats discovery would have put there 300: @stats.discovered_agents([expected_agents].flatten) 301: 302: # Handle fire and forget requests 303: # 304: # If a specific reply-to was set then from the client perspective this should 305: # be a fire and forget request too since no response will ever reach us - it 306: # will go to the reply-to destination 307: if args[:process_results] == false || @reply_to 308: return fire_and_forget_request(action, args, custom_filter) 309: end 310: 311: # Now do a call pretty much exactly like in method_missing except with our own 312: # options and discovery magic 313: if block_given? 314: call_agent(action, args, custom_options, [expected_agents].flatten) do |r| 315: block.call(r) 316: end 317: else 318: call_agent(action, args, custom_options, [expected_agents].flatten) 319: end 320: end
Disconnects cleanly from the middleware
# File lib/mcollective/rpc/client.rb, line 107 107: def disconnect 108: @client.disconnect 109: end
Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.
Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery
Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json
Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.
Use reset to force a new discovery
# File lib/mcollective/rpc/client.rb, line 393 393: def discover(flags={}) 394: flags.keys.each do |key| 395: raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key) 396: end 397: 398: flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose 399: 400: verbose = false unless @output_format == :console 401: 402: # flags[:nodes] and flags[:hosts] are the same thing, we should never have 403: # allowed :hosts as that was inconsistent with the established terminology 404: flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts) 405: 406: reset if flags[:nodes] || flags[:json] 407: 408: unless @discovered_agents 409: # if either hosts or JSON is supplied try to figure out discovery data from there 410: # if direct_addressing is not enabled this is a critical error as the user might 411: # not have supplied filters so raise an exception 412: if flags[:nodes] || flags[:json] 413: raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing 414: 415: hosts = [] 416: 417: if flags[:nodes] 418: hosts = Helpers.extract_hosts_from_array(flags[:nodes]) 419: elsif flags[:json] 420: hosts = Helpers.extract_hosts_from_json(flags[:json]) 421: end 422: 423: raise "Could not find any hosts in discovery data provided" if hosts.empty? 424: 425: @discovered_agents = hosts 426: @force_direct_request = true 427: 428: # if an identity filter is supplied and it is all strings no regex we can use that 429: # as discovery data, technically the identity filter is then redundant if we are 430: # in direct addressing mode and we could empty it out but this use case should 431: # only really be for a few -I's on the CLI 432: # 433: # For safety we leave the filter in place for now, that way we can support this 434: # enhancement also in broadcast mode 435: elsif options[:filter]["identity"].size > 0 436: regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size 437: 438: if regex_filters == 0 439: @discovered_agents = options[:filter]["identity"].clone 440: @force_direct_request = true if Config.instance.direct_addressing 441: end 442: end 443: end 444: 445: # All else fails we do it the hard way using a traditional broadcast 446: unless @discovered_agents 447: @stats.time_discovery :start 448: 449: @stderr.print("Determining the amount of hosts matching filter for #{discovery_timeout} seconds .... ") if verbose 450: 451: # if the requested limit is a pure number and not a percent 452: # and if we're configured to use the first found hosts as the 453: # limit method then pass in the limit thus minimizing the amount 454: # of work we do in the discover phase and speeding it up significantly 455: if @limit_method == :first and @limit_targets.is_a?(Fixnum) 456: @discovered_agents = @client.discover(@filter, @discovery_timeout, @limit_targets) 457: else 458: @discovered_agents = @client.discover(@filter, @discovery_timeout) 459: end 460: 461: @force_direct_request = false 462: @stderr.puts(@discovered_agents.size) if verbose 463: 464: @stats.time_discovery :end 465: end 466: 467: @stats.discovered_agents(@discovered_agents) 468: RPC.discovered(@discovered_agents) 469: 470: @discovered_agents 471: end
Sets the fact filter
# File lib/mcollective/rpc/client.rb, line 330 330: def fact_filter(fact, value=nil, operator="=") 331: return if fact.nil? 332: return if fact == false 333: 334: if value.nil? 335: parsed = Util.parse_fact_string(fact) 336: @filter["fact"] << parsed unless parsed == false 337: else 338: parsed = Util.parse_fact_string("#{fact}#{operator}#{value}") 339: @filter["fact"] << parsed unless parsed == false 340: end 341: 342: @filter["fact"].compact! 343: reset 344: end
Sets the identity filter
# File lib/mcollective/rpc/client.rb, line 354 354: def identity_filter(identity) 355: @filter["identity"] << identity 356: @filter["identity"].compact! 357: reset 358: end
Sets and sanity check the limit_method variable used to determine how to limit targets if limit_targets is set
# File lib/mcollective/rpc/client.rb, line 510 510: def limit_method=(method) 511: method = method.to_sym unless method.is_a?(Symbol) 512: 513: raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method) 514: 515: @limit_method = method 516: end
Sets and sanity checks the limit_targets variable used to restrict how many nodes we‘ll target
# File lib/mcollective/rpc/client.rb, line 494 494: def limit_targets=(limit) 495: if limit.is_a?(String) 496: raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/ 497: 498: begin 499: @limit_targets = Integer(limit) 500: rescue 501: @limit_targets = limit 502: end 503: else 504: @limit_targets = Integer(limit) 505: end 506: end
Magic handler to invoke remote methods
Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:
ret = rpc.echo(:msg => "hello world")
This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:
{
:sender => "remote.box.com", :statuscode => 0, :statusmsg => "OK", :data => "hello world"
}
If :statuscode is 0 then everything went find, if it‘s 1 then you supplied the correct arguments etc but the request could not be completed, you‘ll find a human parsable reason in :statusmsg then.
Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.
To get access to the full result of the MCollective::Client#req calls you can pass in a block:
rpc.echo(:msg => "hello world") do |resp| pp resp end
In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you‘ll also need to handle the following exceptions:
UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running
During calls a progress indicator will be shown of how many results we‘ve received against how many nodes were discovered, you can disable this by setting progress to false:
rpc.progress = false
This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It‘s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.
You can invoke this using:
puts rpc.echo(:process_results => false)
This will output just the request id.
Batched processing is supported:
printrpc rpc.ping(:batch_size => 5)
This will do everything exactly as normal but communicate to only 5 agents at a time
# File lib/mcollective/rpc/client.rb, line 209 209: def method_missing(method_name, *args, &block) 210: # set args to an empty hash if nothings given 211: args = args[0] 212: args = {} if args.nil? 213: 214: action = method_name.to_s 215: 216: @stats.reset 217: 218: @ddl.validate_request(action, args) if @ddl 219: 220: # if a global batch size is set just use that else set it 221: # in the case that it was passed as an argument 222: batch_mode = args.include?(:batch_size) || @batch_mode 223: batch_size = args.delete(:batch_size) || @batch_size 224: batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time 225: 226: # if we were given a batch_size argument thats 0 and batch_mode was 227: # determined to be on via global options etc this will allow a batch_size 228: # of 0 to disable or batch_mode for this call only 229: batch_mode = (batch_mode && Integer(batch_size) > 0) 230: 231: # Handle single target requests by doing discovery and picking 232: # a random node. Then do a custom request specifying a filter 233: # that will only match the one node. 234: if @limit_targets 235: target_nodes = pick_nodes_from_discovered(@limit_targets) 236: Log.debug("Picked #{target_nodes.join(',')} as limited target(s)") 237: 238: custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block) 239: elsif batch_mode 240: call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block) 241: else 242: call_agent(action, args, options, :auto, &block) 243: end 244: end
Creates a suitable request hash for the SimpleRPC agent.
You‘d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn‘t care for responses.
In that case you can just do:
msg = your_rpc.new_request("some_action", :foo => :bar) filter = your_rpc.filter your_rpc.client.sendreq(msg, msg[:agent], filter)
This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not
Clearly the use of this technique should be limited and done only if your code requires such a thing
# File lib/mcollective/rpc/client.rb, line 139 139: def new_request(action, data) 140: callerid = PluginManager["security_plugin"].callerid 141: 142: raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid) 143: 144: {:agent => @agent, 145: :action => action, 146: :caller => callerid, 147: :data => data} 148: end
Provides a normal options hash like you would get from Optionparser
# File lib/mcollective/rpc/client.rb, line 475 475: def options 476: {:disctimeout => @discovery_timeout, 477: :timeout => @timeout, 478: :verbose => @verbose, 479: :filter => @filter, 480: :collective => @collective, 481: :output_format => @output_format, 482: :ttl => @ttl, 483: :config => @config} 484: end
Resets various internal parts of the class, most importantly it clears out the cached discovery
# File lib/mcollective/rpc/client.rb, line 368 368: def reset 369: @discovered_agents = nil 370: end