Parent

Class/Module Index [+]

Quicksearch

MCollective::Client

Helpers for writing clients that can talk to agents, do discovery and so forth

Attributes

discoverer[RW]
options[RW]
stats[RW]

Public Class Methods

new(configfile) click to toggle source
# File lib/mcollective/client.rb, line 6
def initialize(configfile)
  @config = Config.instance
  @config.loadconfig(configfile) unless @config.configured

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @options = nil
  @subscriptions = {}

  @discoverer = Discovery.new(self)
  @connection.connect
end
request_sequence() click to toggle source
# File lib/mcollective/client.rb, line 22
def self.request_sequence
  @@request_sequence
end

Public Instance Methods

collective() click to toggle source

Returns the configured main collective if no specific collective is specified as options

# File lib/mcollective/client.rb, line 28
def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end
createreq(msg, agent, filter ={}) click to toggle source
# File lib/mcollective/client.rb, line 53
def createreq(msg, agent, filter ={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  @@request_sequence += 1

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end
disconnect() click to toggle source

Disconnects cleanly from the middleware

# File lib/mcollective/client.rb, line 37
def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end
discover(filter, timeout, limit=0) click to toggle source

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

# File lib/mcollective/client.rb, line 124
def discover(filter, timeout, limit=0)
  discovered = @discoverer.discover(filter, timeout, limit)
end
discovered_req(body, agent, options=false) click to toggle source
# File lib/mcollective/client.rb, line 242
def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end
display_stats(stats, options=false, caption="stomp call summary") click to toggle source

Prints out the stats returns from req and discovered_req in a nice way

# File lib/mcollective/client.rb, line 247
def display_stats(stats, options=false, caption="stomp call summary")
  options = @options unless options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  else
    if stats[:discovered]
      printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
    else
      printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
    end
  end

  if stats[:noresponsefrom].size > 0
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end
receive(requestid = nil) click to toggle source

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

# File lib/mcollective/client.rb, line 95
def receive(requestid = nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!
    unless reply.requestid == requestid
      raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
    end
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end
req(body, agent=nil, options=false, waitfor=0, &block) click to toggle source

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

# File lib/mcollective/client.rb, line 136
def req(body, agent=nil, options=false, waitfor=0, &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts.size || 0
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout]
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  STDOUT.sync = true
  hosts_responded = 0


  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt => e
  ensure
    unsubscribe(agent, :reply)
  end

  return update_stat(stat, hosts_responded, request.requestid)
end
sendreq(msg, agent, filter = {}) click to toggle source

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses

# File lib/mcollective/client.rb, line 44
def sendreq(msg, agent, filter = {})
  request = createreq(msg, agent, filter)

  Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")

  request.publish
  request.requestid
end
start_publisher(request, publish_timeout) click to toggle source

Starts the request publishing routine

# File lib/mcollective/client.rb, line 198
def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
      request.publish
    end
  rescue Timeout::Error => e
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end
start_receiver(requestid, waitfor, timeout, &block) click to toggle source

Starts the response receiver routine Expected to return the amount of received responses.

# File lib/mcollective/client.rb, line 212
def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0
  begin
    Timeout.timeout(timeout) do
      begin
        resp = receive(requestid)
        yield resp.payload
        hosts_responded += 1
      end while (waitfor == 0 || hosts_responded < waitfor)
    end
  rescue Timeout::Error => e
    if (waitfor > hosts_responded)
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end
subscribe(agent, type) click to toggle source
# File lib/mcollective/client.rb, line 70
def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end
threaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.

# File lib/mcollective/client.rb, line 177
def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  publisher = Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end
unsubscribe(agent, type) click to toggle source
# File lib/mcollective/client.rb, line 80
def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end
unthreaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher unthreaded. This is the default client behaviour.

# File lib/mcollective/client.rb, line 169
def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end
update_stat(stat, hosts_responded, requestid) click to toggle source
# File lib/mcollective/client.rb, line 232
def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.