Commit f29c515d authored by Zachary Kent's avatar Zachary Kent

Merge 5.2.x into 6.0.x

parents 82fa815c e73500f6
......@@ -6,14 +6,18 @@ master_running() {
if [ ! -d "/etc/puppetlabs/puppetdb/ssl" ] && [ "$USE_PUPPETSERVER" = true ]; then
/opt/puppetlabs/bin/puppet config set certname "$HOSTNAME"
/opt/puppetlabs/bin/puppet config set server "$PUPPETSERVER_HOSTNAME"
if [ ! -f "/etc/puppetlabs/puppet/ssl/certs/${HOSTNAME}.pem" ] && [ "$USE_PUPPETSERVER" = true ]; then
# if this is our first run, run puppet agent to get certs in place
while ! master_running; do
sleep 1
set -e
/opt/puppetlabs/bin/puppet config set certname "$HOSTNAME"
/opt/puppetlabs/bin/puppet config set server "$PUPPETSERVER_HOSTNAME"
/opt/puppetlabs/bin/puppet agent --verbose --onetime --no-daemonize --waitforcert 120
if [ ! -d "/etc/puppetlabs/puppetdb/ssl" ] && [ "$USE_PUPPETSERVER" = true ]; then
/opt/puppetlabs/server/bin/puppetdb ssl-setup -f
......@@ -58,7 +58,7 @@ class Puppet::Util::Puppetdb::Command
checksum = Digest::SHA1.hexdigest(checksum_payload)
for_whom = " for #{certname}" if certname
params = "checksum=#{checksum}&version=#{version}&certname=#{certname}&command=#{command}&producer-timestamp=#{producer_timestamp_utc.to_i}"
params = "checksum=#{checksum}&version=#{version}&certname=#{certname}&command=#{command}&producer-timestamp=#{producer_timestamp_utc.iso8601(3)}"
response = profile("Submit command HTTP post", [:puppetdb, :command, :submit]) do
Http.action("#{CommandsUrl}?#{params}", :command) do |http_instance, path|
......@@ -4,6 +4,7 @@ $LOAD_PATH.unshift File.join(dir, "../lib")
# don't fail any worse than we already would.
$LOAD_PATH.push File.join(dir, "../../../puppetlabs_spec_helper")
require 'cgi'
require 'rspec'
require 'rspec/expectations'
require 'puppetlabs_spec_helper/puppet_spec_helper'
......@@ -33,6 +34,13 @@ def assert_command_req(expected_payload, actual_payload)
actual_producer_timestamp <=
def assert_valid_producer_ts(path)
_, param_str = path.split "?"
params = CGI::parse(param_str)
return false if params["producer-timestamp"].size != 1
RSpec.configure do |config|
config.before :each do
......@@ -26,11 +26,12 @@ describe Puppet::Util::Puppetdb::Command do
httpok.stubs(:body).returns '{"uuid": "a UUID"}'
http.expects(:post).with() do | path, payload, headers, options |
param_map = CGI::parse(URI(path).query)
param_map['certname'].first.should == 'foo.localdomain' &&
assert_valid_producer_ts(path) &&
param_map['certname'].first.should == 'foo.localdomain' &&
param_map['version'].first.should == '1' &&
param_map['command'].first.should == 'OPEN_SESAME'
options[:compress].should == :gzip
options[:metric_id].should == [:puppetdb, :command, 'OPEN_SESAME']
param_map['command'].first.should == 'OPEN_SESAME' &&
options[:compress] == :gzip &&
options[:metric_id] == [:puppetdb, :command, 'OPEN_SESAME']
......@@ -9,7 +9,8 @@
[puppetlabs.puppetdb.schema :refer [defn-validated]]
[puppetlabs.puppetdb.utils :as utils]
[puppetlabs.kitchensink.core :as kitchensink]
[schema.core :as s]))
[schema.core :as s]
[clj-time.coerce :as c]))
(defn get-metric [base-url metric-name]
(let [url (str (utils/base-url->str base-url)
......@@ -42,7 +43,8 @@
(let [body (json/generate-string payload)
url (str (utils/base-url->str base-url)
(format "?command=%s&version=%s&certname=%s&producer-timestamp=%s"
(str/replace command #" " "_") version certname (System/currentTimeMillis))
(str/replace command #" " "_") version certname
(c/from-long (System/currentTimeMillis)))
(when timeout (format "&secondsToWaitForCompletion=%s" timeout)))]
(http-client/post url {:body body
:throw-exceptions false
......@@ -65,7 +65,7 @@
[puppetlabs.puppetdb.utils :as utils]
[slingshot.slingshot :refer [try+ throw+]]
:refer [command-names supported-command-versions]]
:refer [command-names command-keys supported-command-versions]]
:refer [defservice service-context]]
[schema.core :as s]
......@@ -436,7 +436,8 @@
(defn make-cmd-processed-message [cmd ex]
(conj (select-keys cmd [:id :command :version])
;; :delete? from cmdref is needed to check when a command gets "bashed"
(conj (select-keys cmd [:id :command :version :delete?])
[:producer-timestamp (get-in cmd [:payload :producer_timestamp])])
(when ex
[:exception ex])))
......@@ -492,9 +493,11 @@
"Processes a command ref marked for deletion. This is similar to
processing a non-delete cmdref except different metrics need to be
updated to indicate the difference in command"
[{:keys [command version certname id received] :as cmdref} q scf-write-db response-chan stats]
(process-command-and-respond! cmdref scf-write-db response-chan stats)
(log-command-processed-messsage id received (now) :command-obsolete certname {:obsolete-cmd? true})
[{:keys [command version certname id received] :as cmdref}
q scf-write-db response-chan stats blacklist-config]
(process-command-and-respond! cmdref scf-write-db response-chan stats blacklist-config)
(log-command-processed-messsage id received (now) (command-keys command)
certname {:obsolete-cmd? true})
(queue/ack-command q {:entry (queue/cmdref->entry cmdref)})
(update-counter! :depth command version dec!)
(update-counter! :invalidated command version dec!))
......@@ -584,7 +587,7 @@
(update! (cmd-metric command version :queue-time) q-time)))
(if delete?
(process-delete-cmdref cmdref q scf-write-db response-chan stats)
(process-delete-cmdref cmdref q scf-write-db response-chan stats blacklist-config)
(let [retries (count (:attempts cmdref))]
(process-cmdref cmdref q scf-write-db response-chan stats blacklist-config)
(ns puppetlabs.puppetdb.command.constants)
(ns puppetlabs.puppetdb.command.constants
(:require [clojure.set :as set]))
(def command-names
{:replace-catalog "replace catalog"
......@@ -6,6 +7,8 @@
:deactivate-node "deactivate node"
:store-report "store report"})
(def command-keys (set/map-invert command-names))
(defn- version-range [min-version max-version]
(set (range min-version (inc max-version))))
......@@ -214,7 +214,7 @@
(get "secondsToWaitForCompletion")
(* 1000))
submit-params (select-keys params ["certname" "command" "version"])
submit-params (select-keys params ["certname" "command" "version" "producer-timestamp"])
submit-params (if-let [v (submit-params "version")]
(update submit-params "version" str)
......@@ -226,7 +226,7 @@
(get submit-params "command")
(Integer/parseInt (get submit-params "version"))
(get submit-params "certname")
(pdbtime/from-string (get submit-params "producer-ts"))
(get submit-params "producer-timestamp")
(stream-with-max-check body max-command-size)
......@@ -47,7 +47,7 @@
[puppetlabs.puppetdb.utils :as utils]
[puppetlabs.puppetdb.time :as pt]
[ :refer [get-service]]
[ :refer [get-service app-context]]
[clojure.core.async :as async]
[puppetlabs.kitchensink.core :as ks]
[clojure.string :as str]
......@@ -60,7 +60,9 @@
:refer [service-context]]
[ :refer [mk-pool scheduled-jobs]]
[puppetlabs.puppetdb.testutils :as tu])
[puppetlabs.puppetdb.testutils :as tu]
[puppetlabs.puppetdb.client :as client]
[puppetlabs.puppetdb.threadpool :as gtp])
(:import [java.nio.file Files]
[java.util.concurrent TimeUnit]
[org.joda.time DateTime DateTimeZone]))
......@@ -1362,6 +1364,48 @@
(defn- get-config []
(conf/get-config (get-service svc-utils/*server* :DefaultedConfig)))
(deftest bashed-commands-handled-correctly
(let [real-dochan gtp/dochan
go-ahead-and-execute (promise)]
(with-redefs [gtp/dochan (fn [& args]
(apply real-dochan args))]
(let [{pdb-host :host pdb-port :port
:or {pdb-host "" pdb-port 8080}} (:jetty (get-config))
base-url (utils/pdb-cmd-base-url pdb-host pdb-port :v1)
facts {:certname ""
:environment "test"
:producer_timestamp (str (now))
:producer "puppetserver"
:values {:foo "1"
:bar "2"}
:package_inventory [["openssl" "1.1.0e-1" "apt"]]}
dispatcher (get-service svc-utils/*server* :PuppetDBCommandDispatcher)
response-mult (response-mult dispatcher)
response-watch-ch (async/chan 10)
_ (async/tap response-mult response-watch-ch)]
;; submit two replace facts commands for the same certname
;; this should cause one of the commands to get "bashed" in the queue
(client/submit-facts base-url "" 5 facts)
;; make sure the second command has a later timestamp
(Thread/sleep 200)
(client/submit-facts base-url "" 5 facts)
;; allow pdb to process messages
(deliver go-ahead-and-execute true)
;; grab reponse maps from commands above, timeout if not received
(let [[val _] (async/alts!! [(async/into
#{} (async/take 2 response-watch-ch))
(async/timeout tu/default-timeout-ms)])]
(when-not val
(throw (Exception. "timed out waiting for response-chan")))
;; check the first command was "bashed"
(is (= #{{:id 0, :delete? true} {:id 1, :delete? nil}}
(set (map #(select-keys % [:id :delete?]) val))))))))))
(deftest command-service-stats
(let [pdb (get-service svc-utils/*server* :PuppetDBServer)
......@@ -304,7 +304,8 @@
:body "more than ten characters"
:params {"command" "replace catalog"
"version" 4
"certname" ""}}
"certname" ""
"producer-timestamp" "2018-11-1"}}
wait-req (assoc-in req [:params "secondsToWaitForCompletion"] "0.001")]
;; These cases differ because we want to skip the processing
;; via timeout in the "success" case.
......@@ -313,11 +314,16 @@
(is (= http/status-ok
(:status (no-max-app req)))))
(is (= "more than ten characters"
(->> (async/<!! command-chan)
(stock/stream q)
(let [test-cmdref (async/<!! command-chan)]
(is (= "more than ten characters"
(->> test-cmdref
(stock/stream q)
;; test producer-timestamp is included in cmdref
(is (= "2018-11-01T00:00:00.000Z"
(str (:producer-ts test-cmdref)))))
(testing "(with timeout),"
(testing "when disabled, allows larger size"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment