Commit 33c63ffc authored by Jeremy Barlow's avatar Jeremy Barlow

(PDB-2640) Add support for gzip-compressing commands via terminus

This commit adds support for gzip-compressing commands which the
PuppetDB terminus sends out.  When stockpile persists the commands to
disk, it stores the gzipped json bytes with a .json.gz file extension.
Command payloads are decompressed and parsed from JSON when processed
back out of the queue.

The terminus does not gzip-compress the command content itself.
Instead, to minimize memory usage somewhat and realize a bit better
performance, it passes through its intent for the implementation
upstream from the Puppet::Network::HttpPool.http_instance to perform
the gzip compression.  Corresponding work in Puppet Server and its
upstream clj-http-client library enables the payload to be compressed.
parent 1c98f73e
......@@ -62,7 +62,7 @@ class Puppet::Util::Puppetdb::Command
begin
response = profile("Submit command HTTP post", [:puppetdb, :command, :submit]) do
Http.action("#{CommandsUrl}?#{params}", :command) do |http_instance, path|
http_instance.post(path, payload, headers)
http_instance.post(path, payload, headers, {:compress => :gzip})
end
end
......
......@@ -24,12 +24,12 @@ describe Puppet::Util::Puppetdb::Command do
it "should issue the HTTP POST and log success" do
httpok.stubs(:body).returns '{"uuid": "a UUID"}'
http.expects(:post).with() do | path, payload, headers |
http.expects(:post).with() do | path, payload, headers, options |
param_map = CGI::parse(URI(path).query)
param_map['certname'].first.should == 'foo.localdomain' &&
param_map['version'].first.should == '1' &&
param_map['command'].first.should == 'OPEN_SESAME'
options[:compress].should == :gzip
end.returns(httpok)
subject.submit
......
......@@ -130,7 +130,7 @@
;; body. Rather than do that, nil is being passed. This
;; is still correct, the only downside is comands won't
;; be overwritten in the queue by newer commands.
(enqueue-fn command version certname nil payload)
(enqueue-fn command version certname nil payload "")
(catch Exception ex
(log/error ex
......
......@@ -246,7 +246,7 @@
[q
command-chan
^Semaphore write-semaphore
{:keys [command certname command-stream] :as command-req} :- queue/command-req-schema]
{:keys [command certname command-stream compression] :as command-req} :- queue/command-req-schema]
(try
(.acquire write-semaphore)
(time! (get @metrics :message-persistence-time)
......@@ -407,8 +407,8 @@
(defprotocol PuppetDBCommandDispatcher
(enqueue-command
[this command version certname producer-ts payload]
[this command version certname producer-ts payload command-callback]
[this command version certname producer-ts payload compression]
[this command version certname producer-ts payload compression command-callback]
"Submits the command for processing, and then returns its unique id.")
(stats [this]
......@@ -660,16 +660,16 @@
(stats [this]
@(:stats (service-context this)))
(enqueue-command [this command version certname producer-ts command-stream]
(enqueue-command this command version certname producer-ts command-stream identity))
(enqueue-command [this command version certname producer-ts command-stream compression]
(enqueue-command this command version certname producer-ts command-stream compression identity))
(enqueue-command [this command version certname producer-ts command-stream command-callback]
(enqueue-command [this command version certname producer-ts command-stream compression command-callback]
(let [config (get-config)
q (:q (shared-globals))
command-chan (:command-chan (shared-globals))
write-semaphore (:write-semaphore (service-context this))
command (if (string? command) command (command-names command))
command-req (queue/create-command-req command version certname producer-ts command-callback command-stream)
command-req (queue/create-command-req command version certname producer-ts compression command-callback command-stream)
result (do-enqueue-command q command-chan write-semaphore command-req)]
;; Obviously assumes that if do-* doesn't throw, msg is in
(inc-cmd-depth command version)
......
......@@ -204,7 +204,8 @@
{:producer-ts nil
:command "unknown"
:version 0
:certname digest})
:certname digest
:compression ""})
cmd-dest (.resolve path (str id \- metadata))]
(Files/write cmd-dest bytes (oopts []))
(let [info-dest (store-failed-command-info id metadata "unknown"
......
......@@ -27,6 +27,10 @@
"store report" 5
"deactivate node" 3})
(defn content-encoding->file-extension
[encoding]
(get {"gzip" "gz"} encoding ""))
(def valid-commands-str (str/join ", " (sort (vals command-names))))
(defn- validate-command-version
......@@ -203,7 +207,7 @@
(defn- enqueue-command-handler
"Enqueues the command in request and returns a UUID"
[enqueue-fn max-command-size]
(fn [{:keys [body params] :as request}]
(fn [{:keys [body params headers]}]
;; For now body will be in-memory, but eventually may be a stream.
(try+
(let [uuid (kitchensink/uuid)
......@@ -215,6 +219,8 @@
submit-params (if-let [v (submit-params "version")]
(update submit-params "version" str)
submit-params)
compression (content-encoding->file-extension
(get headers "content-encoding"))
;; Replace read-body when our queue supports streaming
do-submit (fn [command-callback]
(enqueue-fn
......@@ -223,6 +229,7 @@
(get submit-params "certname")
(pdbtime/from-string (get submit-params "producer-ts"))
(stream-with-max-check body max-command-size)
compression
command-callback))]
(if (some-> completion-timeout-ms pos?)
......@@ -265,6 +272,7 @@
"certname" "command" "version" "producer-timestamp"]})
mid/verify-accepts-json
(mid/verify-content-type ["application/json"])
(mid/verify-content-encoding ["gzip" "identity"])
(mid/fail-when-payload-too-large reject-large-commands? max-command-size)
(mid/wrap-with-metrics (atom {}) http/leading-uris)
(mid/wrap-with-globals get-shared-globals)))
......@@ -53,6 +53,7 @@
command-versions]
(let [path (.getName tar-entry)
[command-type certname] (command-matcher path)
compression ""
command-fn' (fn [command-kwd command-version]
(command-fn command-kwd
command-version
......@@ -62,7 +63,8 @@
utils/read-json-content
json/generate-string
(.getBytes "UTF-8")
java.io.ByteArrayInputStream.)))]
java.io.ByteArrayInputStream.)
compression))]
(case command-type
"catalogs"
(do
......
......@@ -115,6 +115,21 @@
(http/error-response (tru "must accept {0}" content-type)
http/status-not-acceptable))))
(defn verify-content-encoding
"Verification for the specified list of content-encodings."
[app content-encodings]
{:pre [(coll? content-encodings)
(every? string? content-encodings)]}
(fn [{:keys [headers request-method] :as req}]
(let [content-encoding (headers "content-encoding")]
(if (or (not= request-method :post)
(empty? content-encoding)
(some #{content-encoding} content-encodings))
(app req)
(http/error-response (tru "content encoding {0} not supported"
content-encoding)
http/status-unsupported-type)))))
(defn verify-content-type
"Verification for the specified list of content-types."
[app content-types]
......
......@@ -3,7 +3,8 @@
[java.io InputStreamReader BufferedReader InputStream]
[java.util TreeMap HashMap]
[java.nio.file Files LinkOption]
[java.nio.file.attribute FileAttribute])
[java.nio.file.attribute FileAttribute]
(org.apache.commons.compress.compressors.gzip GzipCompressorInputStream))
(:require [clojure.string :as str :refer [re-quote-replacement]]
[puppetlabs.stockpile.queue :as stock]
[clj-time.coerce :as tcoerce]
......@@ -132,18 +133,22 @@
certname if the certname is long or contains filesystem special characters."
([] (metadata-serializer puppetdb-command->metadata-command))
([puppetdb-command->metadata-command]
(fn [received {:keys [producer-ts command version certname]}]
(fn [received {:keys [producer-ts command version certname compression]}]
(when-not (puppetdb-command->metadata-command command)
(throw (IllegalArgumentException. (trs "unknown command ''{0}''" command))))
(let [certname (or certname "unknown-host")
received+producer-time (encode-command-time received producer-ts)
short-command (puppetdb-command->metadata-command command)
safe-certname (embeddable-certname received short-command version certname)]
(if (= safe-certname certname)
(format "%s_%s_%d_%s.json" received+producer-time short-command version safe-certname)
(let [name-hash (kitchensink/utf8-string->sha1 certname)]
(format "%s_%s_%d_%s_%s.json"
received+producer-time short-command version safe-certname name-hash)))))))
safe-certname (embeddable-certname received short-command version certname)
name (if (= safe-certname certname)
safe-certname
(format "%s_%s"
safe-certname
(kitchensink/utf8-string->sha1 certname)))
extension (str "json" (if (not-empty compression)
(str "." compression)))]
(format "%s_%s_%d_%s.%s"
received+producer-time short-command version name extension)))))
(def serialize-metadata (metadata-serializer))
......@@ -151,7 +156,7 @@
(re-pattern (str
"([0-9]+)([+|-][0-9]+)?_"
(match-any-of valid-commands)
"_([0-9]+)_(.*)\\.json")))
"_([0-9]+)_(.*)\\.json(\\..*)?")))
(defn metadata-parser
"Given an (optional) map between the command names that appear in metadata
......@@ -167,7 +172,7 @@
(let [rx (metadata-rx (keys metadata-command->puppetdb-command))
md-cmd->pdb-cmd metadata-command->puppetdb-command]
(fn [s]
(when-let [[_ received-stamp producer-offset md-command version certname] (re-matches rx s)]
(when-let [[_ received-stamp producer-offset md-command version certname compression] (re-matches rx s)]
(let [received-time-long (Long/parseLong received-stamp)
producer-offset (and producer-offset (Long/parseLong producer-offset))]
(and certname
......@@ -179,7 +184,10 @@
tcoerce/from-long)
:version (Long/parseLong version)
:command (get md-cmd->pdb-cmd md-command "unknown")
:certname certname})))))))
:certname certname
:compression (if (nil? compression)
""
(subs compression 1))})))))))
(def parse-metadata (metadata-parser))
......@@ -190,7 +198,8 @@
:certname s/Str
:producer-ts (s/maybe pls/Timestamp)
:callback (s/=> s/Any s/Any)
:command-stream java.io.InputStream})
:command-stream java.io.InputStream
:compression s/Str})
(s/defn create-command-req :- command-req-schema
"Validating constructor function for command requests"
......@@ -198,6 +207,7 @@
version :- s/Int
certname :- s/Str
producer-ts :- (s/maybe s/Str)
compression :- s/Str
callback :- (s/=> s/Any s/Any)
command-stream :- java.io.InputStream]
{:command command
......@@ -205,32 +215,48 @@
:certname certname
:producer-ts (when producer-ts
(pdbtime/from-string producer-ts))
:compression compression
:callback callback
:command-stream command-stream})
(defrecord CommandRef [id command version certname received producer-ts callback delete?])
(defrecord CommandRef [id command version certname received producer-ts callback delete? compression])
(defn cmdref->entry [{:keys [id received] :as cmdref}]
(stock/entry id (serialize-metadata received cmdref)))
(defn entry->cmdref [entry]
(let [{:keys [received command version certname producer-ts]} (-> entry
stock/entry-meta
parse-metadata)]
(let [{:keys [received command version
certname producer-ts compression]} (-> entry
stock/entry-meta
parse-metadata)]
(map->CommandRef {:id (stock/entry-id entry)
:command command
:version version
:certname certname
:received received
:producer-ts producer-ts
:callback identity})))
:callback identity
:compression compression})))
(defn wrap-decompression-stream
[compression command-stream]
(condp = compression
nil command-stream
"" command-stream
"gz" (GzipCompressorInputStream. command-stream)
(throw+ {:kind ::parse-error} nil
(trs "Unsupported compression format for command: {0}"
compression))))
(defn cmdref->cmd [q cmdref]
(let [entry (cmdref->entry cmdref)]
(let [compression (:compression cmdref)
entry (cmdref->entry cmdref)]
(with-open [command-stream (stock/stream q entry)]
(assoc cmdref
:payload (stream->json command-stream)
:entry entry))))
:payload (stream->json (wrap-decompression-stream
compression
command-stream))
:entry entry))))
(defn cons-attempt [cmdref exception]
(update cmdref :attempts conj {:exception exception
......@@ -278,7 +304,7 @@
current-time
command-req)]
(-> command-req
(select-keys [:command :version :certname :producer-ts :callback])
(select-keys [:command :version :certname :producer-ts :callback :compression])
(assoc :id (stock/entry-id entry)
:received (kitchensink/timestamp current-time))
map->CommandRef)))
......
......@@ -78,7 +78,8 @@
:values {:foo "the foo"
:bar "the bar"
:baz "the baz"}
:producer_timestamp (to-string (now))}))
:producer_timestamp (to-string (now))})
"")
@(block-until-results 200 (first (get-factsets "foo.local")))
......@@ -113,7 +114,8 @@
{:certname "foo.local"
:environment "DEV"
:values {:a "a" :b "b" :c "c"}
:producer_timestamp (to-string (now))}))
:producer_timestamp (to-string (now))})
"")
@(block-until-results 200 (first (get-factsets "foo.local")))
(let [exp ["a" "b" "c"]
......
......@@ -48,19 +48,19 @@
(are [cmd-info metadata-str] (= cmd-info (#'dlo/parse-cmd-filename metadata-str))
{:received r0 :version 0 :command "replace catalog" :certname "foo" :producer-ts nil}
{:received r0 :version 0 :command "replace catalog" :certname "foo" :producer-ts nil :compression ""}
"0-0_catalog_0_foo.json"
{:received r0 :version 0 :command "replace catalog" :certname "foo.json" :producer-ts nil}
{:received r0 :version 0 :command "replace catalog" :certname "foo.json" :producer-ts nil :compression ""}
"0-0_catalog_0_foo.json.json"
{:received r10 :version 10 :command "replace catalog" :certname "foo" :producer-ts nil}
{:received r10 :version 10 :command "replace catalog" :certname "foo" :producer-ts nil :compression ""}
"10-10_catalog_10_foo.json"
{:received r10 :version 42 :command "replace catalog" :certname "foo" :producer-ts nil}
{:received r10 :version 42 :command "replace catalog" :certname "foo" :producer-ts nil :compression ""}
"10-10_catalog_42_foo.json"
{:received r10 :version 10 :command "unknown" :certname "foo" :producer-ts nil}
{:received r10 :version 10 :command "unknown" :certname "foo" :producer-ts nil :compression ""}
"10-10_unknown_10_foo.json")
(is (not (#'dlo/parse-cmd-filename "0-0_foo_0_foo.json")))))
......
......@@ -126,7 +126,7 @@
:total))
(defn failed-catalog-req [version certname payload]
(queue/create-command-req "replace catalog" version certname nil identity
(queue/create-command-req "replace catalog" version certname nil "" identity
(tqueue/coerce-to-stream payload)))
(deftest command-processor-integration
......@@ -393,6 +393,7 @@
version-num
certname
(ks/timestamp (now))
""
identity
(tqueue/coerce-to-stream "bad stuff"))))
(is (empty? (query-to-vec "SELECT * FROM catalogs")))
......@@ -921,6 +922,7 @@
latest-facts-version
"foo.example.com"
(ks/timestamp (now))
""
identity
(tqueue/coerce-to-stream "bad stuff"))))
(is (empty? (query-to-vec "SELECT * FROM facts")))
......@@ -933,6 +935,7 @@
(handle-message (queue/store-command q (queue/create-command-req "replace facts"
2
"foo.example.com"
""
(ks/timestamp (now))
identity
(tqueue/coerce-to-stream "bad stuff"))))
......@@ -1462,7 +1465,8 @@
(tqueue/coerce-to-stream
{:environment "DEV" :certname "foo.local"
:values {:foo "foo"}
:producer_timestamp (to-string (now))}))
:producer_timestamp (to-string (now))})
"")
@received-cmd?
(is (= {:received-commands 1 :executed-commands 0} (stats)))
(deliver go-ahead-and-execute true)
......@@ -1486,7 +1490,8 @@
"foo.local"
nil
(tqueue/coerce-to-stream
{:certname "foo.local" :producer_timestamp input-stamp}))
{:certname "foo.local" :producer_timestamp input-stamp})
"")
(is (svc-utils/wait-for-server-processing svc-utils/*server* default-timeout-ms)
(format "Server didn't process received commands after %dms" default-timeout-ms))
......@@ -1526,7 +1531,8 @@
"foo.local"
nil
(tqueue/coerce-to-stream
{:certname "foo.local" :producer_timestamp producer-ts}))
{:certname "foo.local" :producer_timestamp producer-ts})
"")
(let [received-uuid (async/alt!! response-chan ([msg] (:producer-timestamp msg))
(async/timeout 10000) ::timeout)]
......@@ -1580,6 +1586,7 @@
(assoc :producer_timestamp old-producer-ts
:certname "foo.com")
tqueue/coerce-to-stream)
""
#(deliver cmd-1 %))
(enqueue-command (command-names :replace-catalog)
......@@ -1589,6 +1596,7 @@
(-> base-cmd
(assoc :producer_timestamp old-producer-ts)
tqueue/coerce-to-stream)
""
#(deliver cmd-2 %))
(enqueue-command (command-names :replace-catalog)
......@@ -1598,6 +1606,7 @@
(-> base-cmd
(assoc :producer_timestamp new-producer-ts)
tqueue/coerce-to-stream)
""
#(deliver cmd-3 %))
(.release semaphore)
......
......@@ -32,8 +32,9 @@
[puppetlabs.puppetdb.testutils.queue :as tqueue]
[puppetlabs.puppetdb.queue :as queue])
(:import [clojure.lang ExceptionInfo]
[java.io ByteArrayInputStream]
[java.util.concurrent Semaphore]))
[java.io ByteArrayInputStream ByteArrayOutputStream]
[java.util.concurrent Semaphore]
(java.util.zip GZIPOutputStream)))
(def endpoints [[:v1 "/v1"]])
......@@ -46,7 +47,9 @@
"Makes a post body request"
[path params payload]
(let [body (when-not (nil? payload)
(ByteArrayInputStream. (.getBytes payload "UTF-8")))]
(ByteArrayInputStream. (if (string? payload)
(.getBytes payload "UTF-8")
payload)))]
(post-request path params {"content-type" "application/json"
"accept" "application/json"} body)))
......@@ -71,24 +74,54 @@
{:foo 1
:bar 2})
checksum (kitchensink/utf8-string->sha1 payload)
req (internal-request {"payload" payload "checksum" checksum})
[command-chan app] (create-handler q)
response (app (post-request* endpoint
{"checksum" checksum
"version" (str (get min-supported-commands "replace facts"))
"certname" "foo.com"
"command" "replace facts"}
payload))]
(assert-success! response)
version (str (get min-supported-commands "replace facts"))
request-params {"checksum" checksum
"version" version
"certname" "foo.com"
"command" "replace facts"}]
(testing "for raw json requests"
(let [response (app (post-request* endpoint
request-params
payload))]
(assert-success! response)
(let [cmdref (async/<!! command-chan)]
(is (= {:foo 1
:bar 2}
(:payload (queue/cmdref->cmd q cmdref)))))
(is (= (content-type response)
http/json-response-content-type))
(is (uuid-in-response? response))))
(testing "for gzipped json requests"
(let [gzipped-payload-stream (ByteArrayOutputStream.)
_ (with-open [gzip-output-stream (GZIPOutputStream.
gzipped-payload-stream)]
(->> payload
(.getBytes)
(.write gzip-output-stream)))
gzipped-payload (.toByteArray gzipped-payload-stream)
request (post-request* endpoint
request-params
gzipped-payload)
request-with-content-encoding (assoc-in request
[:headers
"content-encoding"]
"gzip")
response (app request-with-content-encoding)]
(assert-success! response)
(let [cmdref (async/<!! command-chan)]
(is (= {:foo 1
:bar 2}
(:payload (queue/cmdref->cmd q cmdref)))))
(let [cmdref (async/<!! command-chan)]
(is (= {:foo 1
:bar 2}
(:payload (queue/cmdref->cmd q cmdref)))))
(is (= (content-type response)
http/json-response-content-type))
(is (uuid-in-response? response)))))
(is (= (content-type response)
http/json-response-content-type))
(is (uuid-in-response? response)))))))
(testing "should return status-bad-request when missing payload"
(tqueue/with-stockpile q
......@@ -253,12 +286,12 @@
(defn handler-with-max [q command-chan max-command-size]
(#'tgt/enqueue-command-handler
(fn [command version certname producer-ts stream callback]
(fn [command version certname producer-ts stream compression callback]
(cmd/do-enqueue-command
q
command-chan
(Semaphore. 100)
(queue/create-command-req command version certname producer-ts callback stream)))
(queue/create-command-req command version certname producer-ts compression callback stream)))
max-command-size))
(deftest enqueue-max-command-size
......
......@@ -143,6 +143,23 @@
(let [wrapped-fn (verify-content-type identity ["application/json"])]
(is (= (wrapped-fn test-req) test-req)))))))
(deftest verify-content-encoding-test
(testing "with content-encoding of gzip"
(let [test-req {:request-method :post
:content-type "application/json"
:headers {"content-encoding" "gzip"}}]
(testing "should succeed with matching content encoding"
(let [wrapped-fn (verify-content-encoding identity ["gzip"])]
(is (= (wrapped-fn test-req) test-req))))
(testing "should fail with no matching content encoding"
(let [wrapped-fn (verify-content-encoding identity ["compress" "deflate"])]
(is (= (wrapped-fn test-req)
{:status 415
:headers {"Content-Type" http/error-response-content-type}
:body "content encoding gzip not supported"})))))))
(deftest whitelist-middleware
(testing "should log on reject"
(let [wl (temp-file "whitelist-log-reject")]
......
......@@ -19,6 +19,7 @@
version
(or certname name)
nil
""
identity
(tqueue/coerce-to-stream catalog)))
......@@ -32,11 +33,12 @@
(doseq [bad-char (conj constants/filename-forbidden-characters \_)]
(is (= "sanitize-me" (sanitize-certname (format "sanitize%cme" bad-char))))))
(defn cmd-req-stub [producer-ts command version certname]
(defn cmd-req-stub [producer-ts command version certname compression]
{:command command
:version version
:certname certname
:producer-ts producer-ts})
:producer-ts producer-ts
:compression compression})
(deftest test-metadata-serializer
(let [recvd (now)
......@@ -48,36 +50,48 @@
(testing "certnames are sanitized"
(let [cname "foo_bar/baz"
safe-cname "foo-bar-baz"
cname-hash (kitchensink/utf8-string->sha1 cname)]
cname-hash (kitchensink/utf8-string->sha1 cname)
compression ""]
(is (= (format "%d_%s_%d_%s_%s.json" recvd-long cmd-abbrev cmd-ver safe-cname cname-hash)
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver cname))))))
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver cname compression))))))
(testing "long certnames are truncated"
(let [long-cname (apply str "trol" (repeat 1000 "lo"))
trunc-cname (subs long-cname 0 (truncated-certname-length recvd cmd-abbrev cmd-ver))
cname-hash (kitchensink/utf8-string->sha1 long-cname)]
cname-hash (kitchensink/utf8-string->sha1 long-cname)
compression ""]
(is (= (format "%d_%s_%d_%s_%s.json" recvd-long cmd-abbrev cmd-ver trunc-cname cname-hash)
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver long-cname))))
(is (<= (utf8-length (serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver long-cname))) 255))))
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver long-cname compression))))
(is (<= (utf8-length (serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver long-cname compression))) 255))))
(testing "multi-byte characters in UTF-8 are counted correctly"
(let [cname-max-length (max-certname-length recvd cmd cmd-ver)
disapproval-monster (apply str (repeat (inc (/ cname-max-length 4)) "ಠ_"))]
(is (<= (utf8-length (serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver disapproval-monster))) 255))))
disapproval-monster (apply str (repeat (inc (/ cname-max-length 4)) "ಠ_"))
compression ""]
(is (<= (utf8-length (serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver disapproval-monster compression))) 255))))
(testing "sanitized certnames are truncated to leave room for hash"
(let [cname-trunc-length (truncated-certname-length recvd cmd-abbrev cmd-ver)
tricky-cname (apply str "____" (repeat cname-trunc-length "o"))
cname-hash (kitchensink/utf8-string->sha1 tricky-cname)
trunc-cname (subs (sanitize-certname tricky-cname) 0 cname-trunc-length)]
trunc-cname (subs (sanitize-certname tricky-cname) 0 cname-trunc-length)
compression ""]
(is (= (format "%d_%s_%d_%s_%s.json" recvd-long cmd-abbrev cmd-ver trunc-cname cname-hash)
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver tricky-cname))))
(is (<= (utf8-length (serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver tricky-cname))) 255))))
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver tricky-cname compression))))
(is (<= (utf8-length (serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver tricky-cname compression))) 255))))
(testing "short & safe certnames are preserved and the hash is omitted"
(let [cname "bender.myowncasino.moon"]
(let [cname "bender.myowncasino.moon"
compression ""]
(is (= (format "%d_%s_%d_%s.json" recvd-long cmd-abbrev cmd-ver cname)
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver cname))))))))
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver cname compression))))))
(testing "compression format is added"
(let [cname "compress.me"
compression "gz"]
(is (= (format "%d_%s_%d_%s.json.%s"
recvd-long cmd-abbrev cmd-ver cname compression)
(serialize-metadata recvd (cmd-req-stub nil cmd cmd-ver cname compression))))))))
(deftest test-metadata
(tqueue/with-stockpile q
......@@ -86,7 +100,7 @@
_ (Thread/sleep 1)
cmdref (->> {:message "payload"}
tqueue/coerce-to-stream
(create-command-req "replace facts" 1 "foo.com" nil identity)
(create-command-req "replace facts" 1 "foo.com" nil "" identity)
(store-command q))
command (cmdref->cmd q cmdref)]
(is (= {:command "replace facts"
......@@ -382,31 +396,37 @@
(deftest test-metadata-parsing
(let [received (time/now)