Commit eae9aa99 authored by Ryan Senior's avatar Ryan Senior

(PDB-3108) Add producer-timestamp to the command "header"

This commit adds producer-timestmap to the POST parameters of command
requests.
parent 3e8ad4d6
......@@ -9,8 +9,9 @@ class Puppet::Resource::Catalog::Puppetdb < Puppet::Indirector::REST
def save(request)
profile("catalog#save", [:puppetdb, :catalog, :save, request.key]) do
catalog = munge_catalog(request.instance, extract_extra_request_data(request))
submit_command(request.key, catalog, CommandReplaceCatalog, 9)
current_time = Time.now
catalog = munge_catalog(request.instance, current_time, extract_extra_request_data(request))
submit_command(request.key, catalog, CommandReplaceCatalog, 9, current_time.clone.utc)
end
end
......@@ -31,7 +32,7 @@ class Puppet::Resource::Catalog::Puppetdb < Puppet::Indirector::REST
hash
end
def munge_catalog(catalog, extra_request_data = {})
def munge_catalog(catalog, producer_timestamp, extra_request_data = {})
profile("Munge catalog", [:puppetdb, :catalog, :munge]) do
data = profile("Convert catalog to JSON data hash", [:puppetdb, :catalog, :convert_to_hash]) do
catalog.to_data_hash
......@@ -51,7 +52,7 @@ class Puppet::Resource::Catalog::Puppetdb < Puppet::Indirector::REST
filter_keys(data)
add_transaction_uuid(data, extra_request_data[:transaction_uuid])
add_environment(data, extra_request_data[:environment])
add_producer_timestamp(data, extra_request_data[:producer_timestamp])
add_producer_timestamp(data, producer_timestamp)
add_producer(data, Puppet[:node_name_value])
data
......@@ -99,7 +100,7 @@ class Puppet::Resource::Catalog::Puppetdb < Puppet::Indirector::REST
# @return [Hash] returns original hash augmented with producer_timestamp
# @api private
def add_producer_timestamp(hash, producer_timestamp)
hash['producer_timestamp'] = Puppet::Util::Puppetdb.to_wire_time(Time.now)
hash['producer_timestamp'] = Puppet::Util::Puppetdb.to_wire_time(producer_timestamp)
hash
end
......
......@@ -18,6 +18,9 @@ class Puppet::Node::Facts::Puppetdb < Puppet::Indirector::REST
def save(request)
profile("facts#save", [:puppetdb, :facts, :save, request.key]) do
current_time = Time.now
payload = profile("Encode facts command submission payload",
[:puppetdb, :facts, :encode]) do
facts = request.instance.dup
......@@ -30,12 +33,12 @@ class Puppet::Node::Facts::Puppetdb < Puppet::Indirector::REST
# when we attempt to use ActiveSupport 2.3.16 on RHEL 5 with
# legacy storeconfigs.
"environment" => request.options[:environment] || request.environment.to_s,
"producer_timestamp" => Puppet::Util::Puppetdb.to_wire_time(Time.now),
"producer_timestamp" => Puppet::Util::Puppetdb.to_wire_time(current_time),
"producer" => Puppet[:node_name_value]
}
end
submit_command(request.key, payload, CommandReplaceFacts, 5)
submit_command(request.key, payload, CommandReplaceFacts, 5, current_time.clone.utc)
end
end
......
......@@ -12,8 +12,10 @@ class Puppet::Node::Puppetdb < Puppet::Indirector::REST
end
def destroy(request)
current_time = Time.now
payload = { :certname => request.key,
:producer_timestamp => Puppet::Util::Puppetdb.to_wire_time(Time.now) }
submit_command(request.key, payload, CommandDeactivateNode, 3)
:producer_timestamp => Puppet::Util::Puppetdb.to_wire_time(current_time) }
submit_command(request.key, payload, CommandDeactivateNode, 3, current_time.clone.utc)
end
end
......@@ -19,7 +19,9 @@ Puppet::Reports.register_report(:puppetdb) do
# @return [void]
def process
profile("report#process", [:puppetdb, :report, :process]) do
submit_command(self.host, report_to_hash, CommandStoreReport, 8)
current_time = Time.now
report_hash = report_to_hash(current_time)
submit_command(self.host, report_hash, CommandStoreReport, 8, current_time.utc)
end
nil
......@@ -30,7 +32,7 @@ Puppet::Reports.register_report(:puppetdb) do
#
# @return Hash[<String, Object>]
# @api private
def report_to_hash
def report_to_hash(producer_timestamp)
profile("Convert report to wire format hash",
[:puppetdb, :report, :convert_to_wire_format_hash]) do
if environment.nil?
......@@ -53,7 +55,7 @@ Puppet::Reports.register_report(:puppetdb) do
"puppet_version" => puppet_version,
"report_format" => report_format,
"configuration_version" => configuration_version.to_s,
"producer_timestamp" => Puppet::Util::Puppetdb.to_wire_time(Time.now),
"producer_timestamp" => Puppet::Util::Puppetdb.to_wire_time(producer_timestamp),
"start_time" => Puppet::Util::Puppetdb.to_wire_time(time),
"end_time" => Puppet::Util::Puppetdb.to_wire_time(time + run_duration),
"environment" => environment,
......
......@@ -55,10 +55,10 @@ module Puppet::Util::Puppetdb
# @param command_name [String] name of command
# @param version [Number] version number of command
# @return [Hash <String, String>]
def submit_command(certname, payload, command_name, version)
def submit_command(certname, payload, command_name, version, producer_timestamp_utc)
profile("Submitted command '#{command_name}' version '#{version}'",
[:puppetdb, :command, :submit, command_name, version]) do
command = Puppet::Util::Puppetdb::Command.new(command_name, version, certname, payload)
command = Puppet::Util::Puppetdb::Command.new(command_name, version, certname, producer_timestamp_utc, payload)
command.submit
end
end
......
......@@ -23,7 +23,7 @@ class Puppet::Util::Puppetdb::Command
# @param payload Object the payload of the command. This object should be a
# primitive (numeric type, string, array, or hash) that is natively supported
# by JSON serialization / deserialization libraries.
def initialize(command, version, certname, payload)
def initialize(command, version, certname, producer_timestamp_utc, payload)
profile("Format payload", [:puppetdb, :payload, :format]) do
@checksum_payload = Puppet::Util::Puppetdb::CharEncoding.utf8_string({
:command => command,
......@@ -45,10 +45,11 @@ class Puppet::Util::Puppetdb::Command
@command = Puppet::Util::Puppetdb::CharEncoding.coerce_to_utf8(command).gsub(" ", "_")
@version = version
@certname = Puppet::Util::Puppetdb::CharEncoding.coerce_to_utf8(certname)
@producer_timestamp_utc = producer_timestamp_utc
@payload = Puppet::Util::Puppetdb::CharEncoding.coerce_to_utf8(payload.to_pson)
end
attr_reader :command, :version, :certname, :payload, :checksum_payload
attr_reader :command, :version, :certname, :producer_timestamp_utc, :payload, :checksum_payload
# Submit the command, returning the result hash.
#
......@@ -57,7 +58,7 @@ class Puppet::Util::Puppetdb::Command
checksum = Digest::SHA1.hexdigest(checksum_payload)
for_whom = " for #{certname}" if certname
params = "checksum=#{checksum}&version=#{version}&certname=#{certname}&command=#{command}"
params = "checksum=#{checksum}&version=#{version}&certname=#{certname}&command=#{command}&producer-timestamp=#{producer_timestamp_utc.to_i}"
begin
response = profile("Submit command HTTP post", [:puppetdb, :command, :submit]) do
Http.action("#{CommandsUrl}?#{params}", :command) do |http_instance, path|
......
......@@ -24,6 +24,14 @@ def extract_producer_timestamp(command)
DateTime.parse(command["producer_timestamp"]).to_time.to_i
end
def assert_command_req(expected_payload, actual_payload)
req = JSON.parse(actual_payload)
actual_producer_timestamp = extract_producer_timestamp(req)
req.delete("producer_timestamp")
req == expected_payload &&
actual_producer_timestamp <= Time.now.to_i
end
RSpec.configure do |config|
config.before :each do
......
......@@ -40,15 +40,10 @@ describe Puppet::Resource::Catalog::Puppetdb do
end
it "should POST the catalog command as a JSON string" do
before_test_time = Time.now
command_payload = subject.munge_catalog(catalog, options)
command_payload = subject.munge_catalog(catalog, Time.now.utc, options)
http.expects(:post).with do |uri, body, headers|
req = JSON.parse(body)
actual_producer_timestamp = extract_producer_timestamp(req)
req.delete("producer_timestamp")
command_payload.delete("producer_timestamp")
req == command_payload &&
actual_producer_timestamp <= Time.now.to_i
assert_command_req(command_payload, body)
end.returns response
save
......@@ -367,7 +362,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
exec{'/bin/true': subscribe => Exec["foo\nbar"]}
MANIFEST
expect { subject.munge_catalog(catalog) }.not_to raise_error
expect { subject.munge_catalog(catalog, Time.now.utc) }.not_to raise_error
end
describe "exported resources" do
......@@ -396,7 +391,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
Notify <<| |>>
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should include(edge)
end
......@@ -412,7 +407,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
Notify <<| |>>
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should include(edge)
end
......@@ -427,7 +422,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
MANIFEST
expect do
subject.munge_catalog(catalog)
subject.munge_catalog(catalog, Time.now.utc)
end.to raise_error(Puppet::Error, "Invalid relationship: Notify[source] { before => Notify[target] }, because Notify[target] is exported but not collected")
end
......@@ -440,7 +435,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
notify { target: }
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should_not include(edge)
end
......@@ -466,7 +461,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
Notify <| |>
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should include(edge)
end
......@@ -482,7 +477,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
Notify <| |>
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should include(edge)
end
......@@ -498,7 +493,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
realize Notify[target]
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should include(edge)
end
......@@ -514,7 +509,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
realize Notify[source]
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should include(edge)
end
......@@ -529,7 +524,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
MANIFEST
expect do
subject.munge_catalog(catalog)
subject.munge_catalog(catalog, Time.now.utc)
end.to raise_error(Puppet::Error, "Invalid relationship: Notify[source] { before => Notify[target] }, because Notify[target] doesn't seem to be in the catalog")
end
......@@ -542,7 +537,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
notify { target: }
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['edges'].should_not include(edge)
end
......@@ -616,7 +611,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
resource[:require] = 'Notify[completely_different]'
Puppet[:code] = [resource, other_resource].map(&:to_manifest).join
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
edge = {'source' => {'type' => 'Notify', 'title' => 'noone'},
'target' => {'type' => 'Notify', 'title' => 'anyone'},
......@@ -631,7 +626,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
other_resource = Puppet::Resource.new(:file, resource_title)
resource[:require] = "File[#{require_title}]"
Puppet[:code] = [resource, other_resource].map(&:to_manifest).join
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
edge = {'source' => {'type' => 'File', 'title' => resource_title},
'target' => {'type' => 'Notify', 'title' => 'anyone'},
......@@ -662,7 +657,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
resource[:require] = 'Exec[completely_different]'
Puppet[:code] = [resource, other_resource].map(&:to_manifest).join
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
edge = {'source' => {'type' => 'Exec', 'title' => 'noone'},
'target' => {'type' => 'Notify', 'title' => 'anyone'},
......@@ -676,7 +671,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
@notify { something: }
MANIFEST
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result['resources'].each do |res|
[res['type'], res['title']].should_not == ['Notify', 'something']
......@@ -684,7 +679,7 @@ describe Puppet::Resource::Catalog::Puppetdb do
end
it "should have the correct set of keys" do
result = subject.munge_catalog(catalog)
result = subject.munge_catalog(catalog, Time.now.utc)
result.keys.should =~ ['certname', 'version', 'edges', 'resources',
'transaction_uuid', 'environment', 'producer_timestamp', "code_id",
......
......@@ -52,13 +52,7 @@ describe Puppet::Node::Facts::Puppetdb do
}
http.expects(:post).with do |uri, body, headers|
req = JSON.parse(body)
actual_producer_timestamp = extract_producer_timestamp(req)
req.delete("producer_timestamp")
req == payload &&
actual_producer_timestamp <= Time.now.to_i
assert_command_req(payload, body)
end.returns response
save
......
......@@ -34,7 +34,7 @@ describe processor do
httpok.stubs(:body).returns '{"uuid": "a UUID"}'
subject.stubs(:run_duration).returns(10)
expected_body = subject.send(:report_to_hash).to_json
expected_body = subject.report_to_hash(Time.now.utc).to_json
Puppet::Network::HttpPool.expects(:http_instance).returns(http)
http.expects(:post).with {|path, body, headers|
......@@ -75,7 +75,7 @@ describe processor do
subject.catalog_uuid = 'bde432'
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["transaction_uuid"].should == 'abc123'
# This won't be defined on < Puppet 4.3.3
......@@ -90,7 +90,7 @@ describe processor do
if defined?(subject.code_id) then
subject.code_id = 'bde432'
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
if defined?(subject.code_id) then
result["code_id"].should == 'bde432'
else
......@@ -100,7 +100,7 @@ describe processor do
it "should include the producer or nil" do
Puppet[:node_name_value] = "foo"
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["producer"].should == "foo"
end
......@@ -108,7 +108,7 @@ describe processor do
if defined?(subject.noop_pending) then
subject.noop_pending = false
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
if defined?(subject.noop_pending) then
result["noop_pending"].should == false
else
......@@ -120,7 +120,7 @@ describe processor do
if defined?(subject.corrective_change) then
subject.stubs(:corrective_change).returns(false)
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
if defined?(subject.corrective_change) then
result["corrective_change"].should == false
else
......@@ -132,7 +132,7 @@ describe processor do
if defined?(subject.cached_catalog_status) then
subject.cached_catalog_status = 'not_used'
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
if defined?(subject.cached_catalog_status) then
result["cached_catalog_status"].should == 'not_used'
else
......@@ -151,7 +151,7 @@ describe processor do
event.status = "noop"
status.add_event(event)
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["noop"].should == true
end
end
......@@ -167,7 +167,7 @@ describe processor do
event.status = "success"
status.add_event(event)
end
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["noop"].should == false
end
end
......@@ -182,7 +182,7 @@ describe processor do
end
it "should use run_duration to calculate the end_time" do
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
duration = Time.parse(result["end_time"]) - Time.parse(result["start_time"])
duration.should == subject.send(:run_duration)
end
......@@ -195,7 +195,7 @@ describe processor do
context "resource without events" do
it "should not include the resource" do
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
# the server will populate the report id, so we validate that the
# client doesn't include one
result.has_key?("report").should be_falsey
......@@ -221,7 +221,7 @@ describe processor do
end
status.add_event(event)
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["resources"].length.should == 1
res = result["resources"][0]
......@@ -253,7 +253,7 @@ describe processor do
context "skipped resource status" do
it "should include the resource" do
status.skipped = true
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["resources"].length.should == 1
resource = result["resources"][0]
......@@ -271,7 +271,7 @@ describe processor do
context "with no events" do
it "should have no events" do
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["resources"].length.should == 0
end
end
......@@ -285,7 +285,7 @@ describe processor do
event.message = "barmessage"
status.add_event(event)
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
result["resources"].length.should == 1
resource = result["resources"][0]
resource["resource_type"].should == "Foo"
......@@ -335,7 +335,7 @@ describe processor do
context "with an unchanged resource" do
it "should include the actual event" do
result = subject.send(:report_to_hash)
result = subject.report_to_hash(Time.now.utc)
unchanged_resources = result["resources"].select { |res| res["events"].empty? and ! (res["skipped"])}
unchanged_resources.length.should == 1
resource = unchanged_resources[0]
......
......@@ -9,7 +9,8 @@ describe Puppet::Util::Puppetdb::Command do
let(:payload) { {'resistance' => 'futile', 'opinion' => 'irrelevant'} }
let(:subject) { described_class.new("OPEN SESAME", 1,
'foo.localdomain', payload) }
'foo.localdomain',
Time.now.utc, payload) }
describe "#submit" do
......@@ -66,7 +67,7 @@ describe Puppet::Util::Puppetdb::Command do
it "should not warn when the the string contains valid UTF-8 characters" do
Puppet.expects(:warning).never
cmd = described_class.new("command-1", 1, "foo.localdomain", {"foo" => "\u2192"})
cmd = described_class.new("command-1", 1, "foo.localdomain", Time.now.utc, {"foo" => "\u2192"})
cmd.payload.include?("\u2192").should be_truthy
end
......@@ -74,7 +75,7 @@ describe Puppet::Util::Puppetdb::Command do
it "should warn when a command payload includes non-ascii UTF-8 characters" do
Puppet.expects(:warning).with {|msg| msg =~ /Error encoding a 'command-1' command for host 'foo.localdomain' ignoring invalid UTF-8 byte sequences/}
cmd = described_class.new("command-1", 1, "foo.localdomain", {"foo" => [192].pack('c*')})
cmd = described_class.new("command-1", 1, "foo.localdomain", Time.now.utc, {"foo" => [192].pack('c*')})
cmd.payload.include?("\ufffd").should be_truthy
end
......@@ -98,7 +99,7 @@ describe Puppet::Util::Puppetdb::Command do
msg =~ Regexp.new(Regexp.quote('"command":"command-1","version":1,"certname":"foo.localdomain","payload":{"foo"')) &&
msg =~ /1 invalid\/undefined/
end
cmd = described_class.new("command-1", 1, "foo.localdomain", {"foo" => [192].pack('c*')})
cmd = described_class.new("command-1", 1, "foo.localdomain", Time.now.utc, {"foo" => [192].pack('c*')})
cmd.payload.include?("\ufffd").should be_truthy
end
end
......
......@@ -21,7 +21,7 @@ describe Puppet::Util::Puppetdb do
describe "#submit_command" do
let(:payload) { {'resistance' => 'futile', 'opinion' => 'irrelevant'} }
let(:command1) { Puppet::Util::Puppetdb::Command.new("OPEN SESAME", 1, 'foo.localdomain',
let(:command1) { Puppet::Util::Puppetdb::Command.new("OPEN SESAME", 1, 'foo.localdomain', Time.now.utc,
payload.merge(:uniqueprop => "command1")) }
it "should submit the command" do
......@@ -32,6 +32,7 @@ describe Puppet::Util::Puppetdb do
subject.submit_command(command1.certname,
command1.payload,
command1.command,
command1.producer_timestamp_utc,
command1.version)
end
......
......@@ -31,8 +31,8 @@
timeout]
(let [body (json/generate-string payload)
url (str (utils/base-url->str base-url)
(format "?command=%s&version=%s&certname=%s"
(str/replace command #" " "_") version certname)
(format "?command=%s&version=%s&certname=%s&producer-timestamp=%s"
(str/replace command #" " "_") version certname (System/currentTimeMillis))
(when timeout (format "&secondsToWaitForCompletion=%s" timeout)))]
(http-client/post url {:body body
:throw-exceptions false
......
......@@ -117,7 +117,8 @@
(s/required-key "certname") s/Str
(s/required-key "received") s/Str
(s/optional-key "secondsToWaitForCompletion") s/Str
(s/optional-key "checksum") s/Str}
(s/optional-key "checksum") s/Str
(s/optional-key "producer-timestamp") s/Str}
:body java.io.InputStream
s/Any s/Any})
......@@ -261,7 +262,7 @@
add-received-param ;; must be (temporally) after validate-query-params
;; The checksum here is vestigial. It is no longer checked
(mid/validate-query-params {:optional ["checksum" "secondsToWaitForCompletion"
"certname" "command" "version"]})
"certname" "command" "version" "producer-timestamp"]})
mid/verify-accepts-json
(mid/verify-content-type ["application/json"])
(mid/fail-when-payload-too-large reject-large-commands? max-command-size)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment