encoder.rb 4.44 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
require 'zlib'

module Aws
  module EventStream 

    # This class provides #encode method for encoding
    # Aws::EventStream::Message into binary.
    #
    # * {#encode} - encode Aws::EventStream::Message into binary
    #   when output IO-like object is provided, binary string
    #   would be written to IO. If not, the encoded binary string
    #   would be returned directly
    #
    # ## Examples
    #
    #   message = Aws::EventStream::Message.new(
    #     headers: {
    #       "foo" => Aws::EventStream::HeaderValue.new(
    #         value: "bar", type: "string"
    #        )
    #     },
    #     payload: "payload"
    #   )
    #   encoder = Aws::EventsStream::Encoder.new
    #   file = Tempfile.new
    #
    #   # encode into IO ouput
    #   encoder.encode(message, file)
    #
    #   # get encoded binary string
    #   encoded_message = encoder.encode(message)
    #
    #   file.read == encoded_message
    #   # => true
    #
    class Encoder

      # bytes of total overhead in a message, including prelude
      # and 4 bytes total message crc checksum
      OVERHEAD_LENGTH = 16

      # Maximum header length allowed (after encode) 128kb
      MAX_HEADERS_LENGTH = 131072

      # Maximum payload length allowed (after encode) 16mb
      MAX_PAYLOAD_LENGTH = 16777216

      # Encodes Aws::EventStream::Message to output IO when
      #   provided, else return the encoded binary string
      #
      # @param [Aws::EventStream::Message] message
      #
      # @param [IO#write, nil] io An IO-like object that
      #   responds to `#write`, encoded message will be
      #   written to this IO when provided
      #
      # @return [nil, String] when output IO is provided,
      #   encoded message will be written to that IO, nil
      #   will be returned. Else, encoded binary string is
      #   returned.
      def encode(message, io = nil)
        encoded = encode_message(message).read
        if io
          io.write(encoded)
          io.close
        else
          encoded
        end
      end

      # Encodes an Aws::EventStream::Message
      #   into Aws::EventStream::BytesBuffer
      #
      # @param [Aws::EventStream::Message] msg
      #
      # @return [Aws::EventStream::BytesBuffer]
      def encode_message(message)
        # create context buffer with encode headers
        ctx_buffer = encode_headers(message)
        headers_len = ctx_buffer.bytesize
        # encode payload
        if message.payload.length > MAX_PAYLOAD_LENGTH
          raise Aws::EventStream::Errors::EventPayloadLengthExceedError.new
        end
        ctx_buffer << message.payload.read
        total_len = ctx_buffer.bytesize + OVERHEAD_LENGTH

        # create message buffer with prelude section
        buffer = prelude(total_len, headers_len)

        # append message context (headers, payload)
        buffer << ctx_buffer.read
        # append message checksum
        buffer << pack_uint32(Zlib.crc32(buffer.read))

        # write buffered message to io
        buffer.rewind
        buffer
      end

      # Encodes headers part of an Aws::EventStream::Message
      #   into Aws::EventStream::BytesBuffer
      #
      # @param [Aws::EventStream::Message] msg
      #
      # @return [Aws::EventStream::BytesBuffer]
      def encode_headers(msg)
        buffer = BytesBuffer.new('')
        msg.headers.each do |k, v|
          # header key
          buffer << pack_uint8(k.bytesize)
          buffer << k

          # header value
          pattern, val_len, idx = Types.pattern[v.type]
          buffer << pack_uint8(idx)
          # boolean types doesn't need to specify value
          next if !!pattern == pattern
          buffer << pack_uint16(v.value.bytesize) unless val_len
          pattern ? buffer << [v.value].pack(pattern) :
            buffer << v.value
        end
        if buffer.bytesize > MAX_HEADERS_LENGTH
          raise Aws::EventStream::Errors::EventHeadersLengthExceedError.new
        end
        buffer
      end

      private

      def prelude(total_len, headers_len)
        BytesBuffer.new(pack_uint32([
          total_len,
          headers_len,
          Zlib.crc32(pack_uint32([total_len, headers_len]))
        ]))
      end

      # overhead encode helpers

      def pack_uint8(val)
        [val].pack('C')
      end

      def pack_uint16(val)
        [val].pack('S>')
      end

      def pack_uint32(val)
        if val.respond_to?(:each)
          val.pack('N*')
        else
          [val].pack('N')
        end
      end

    end

  end
end