knowledge_base.py 11.6 KB
Newer Older
1 2 3 4 5 6 7 8 9
# -*- coding: utf-8 -*-
"""The artifact knowledge base object.

The knowledge base is filled by user provided input and the pre-processing
phase. It is intended to provide successive phases, like the parsing and
analysis phases, with essential information like e.g. the timezone and
codepage of the source data.
"""

10 11 12 13
from __future__ import unicode_literals

import codecs
import datetime
14

15
from plaso.containers import artifacts
16
from plaso.engine import logger
17 18 19
from plaso.lib import py2to3

import pytz  # pylint: disable=wrong-import-order
20 21 22 23 24


class KnowledgeBase(object):
  """Class that implements the artifact knowledge base."""

25 26
  CURRENT_SESSION = 0

27
  def __init__(self):
28
    """Initializes a knowledge base."""
29
    super(KnowledgeBase, self).__init__()
30
    self._codepage = 'cp1252'
31 32
    self._environment_variables = {}
    self._hostnames = {}
33
    self._time_zone = pytz.UTC
34 35
    self._user_accounts = {}
    self._values = {}
36 37 38

  @property
  def codepage(self):
39 40
    """str: codepage of the current session."""
    return self.GetValue('codepage', default_value=self._codepage)
41 42 43

  @property
  def hostname(self):
44 45
    """str: hostname of the current session."""
    hostname_artifact = self._hostnames.get(self.CURRENT_SESSION, None)
46
    if not hostname_artifact:
47
      return ''
48

49
    return hostname_artifact.name or ''
50 51

  @property
52 53 54
  def timezone(self):
    """datetime.tzinfo: timezone of the current session."""
    return self._time_zone
55 56

  @property
57 58 59
  def user_accounts(self):
    """list[UserAccountArtifact]: user accounts of the current session."""
    return self._user_accounts.get(self.CURRENT_SESSION, {}).values()
60 61 62

  @property
  def year(self):
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
    """int: year of the current session."""
    return self.GetValue('year', default_value=0)

  def AddUserAccount(self, user_account, session_identifier=CURRENT_SESSION):
    """Adds an user account.

    Args:
      user_account (UserAccountArtifact): user account artifact.
      session_identifier (Optional[str])): session identifier, where
          CURRENT_SESSION represents the active session.

    Raises:
      KeyError: if the user account already exists.
    """
    if session_identifier not in self._user_accounts:
      self._user_accounts[session_identifier] = {}

    user_accounts = self._user_accounts[session_identifier]
    if user_account.identifier in user_accounts:
      raise KeyError('User account: {0:s} already exists.'.format(
          user_account.identifier))

    user_accounts[user_account.identifier] = user_account

  def AddEnvironmentVariable(self, environment_variable):
    """Adds an environment variable.

    Args:
      environment_variable (EnvironmentVariableArtifact): environment variable
          artifact.

    Raises:
      KeyError: if the environment variable already exists.
    """
    name = environment_variable.name.upper()
    if name in self._environment_variables:
      raise KeyError('Environment variable: {0:s} already exists.'.format(
          environment_variable.name))

    self._environment_variables[name] = environment_variable
103 104 105 106 107 108 109 110 111 112 113 114 115 116

  def GetEnvironmentVariable(self, name):
    """Retrieves an environment variable.

    Args:
      name (str): name of the environment variable.

    Returns:
      EnvironmentVariableArtifact: environment variable artifact or None
          if there was no value set for the given name.
    """
    name = name.upper()
    return self._environment_variables.get(name, None)

117 118 119 120 121 122 123 124 125
  def GetEnvironmentVariables(self):
    """Retrieves the environment variables.

    Returns:
      list[EnvironmentVariableArtifact]: environment variable artifacts.
    """
    return self._environment_variables.values()

  def GetHostname(self, session_identifier=CURRENT_SESSION):
126 127 128 129 130 131
    """Retrieves the hostname related to the event.

    If the hostname is not stored in the event it is determined based
    on the preprocessing information that is stored inside the storage file.

    Args:
132 133
      session_identifier (Optional[str])): session identifier, where
          CURRENT_SESSION represents the active session.
134 135 136 137

    Returns:
      str: hostname.
    """
138 139 140
    hostname_artifact = self._hostnames.get(session_identifier, None)
    if not hostname_artifact:
      return ''
141

142
    return hostname_artifact.name or ''
143 144 145 146 147 148 149 150 151 152 153 154 155 156

  # TODO: remove this function it is incorrect.
  def GetStoredHostname(self):
    """Retrieves the stored hostname.

    The hostname is determined based on the preprocessing information
    that is stored inside the storage file.

    Returns:
      str: hostname.
    """
    store_number = len(self._hostnames)
    return self._hostnames.get(store_number, None)

157
  def GetSystemConfigurationArtifact(self, session_identifier=CURRENT_SESSION):
158 159 160
    """Retrieves the knowledge base as a system configuration artifact.

    Args:
161 162
      session_identifier (Optional[str])): session identifier, where
          CURRENT_SESSION represents the active session.
163 164 165 166 167 168

    Returns:
      SystemConfigurationArtifact: system configuration artifact.
    """
    system_configuration = artifacts.SystemConfigurationArtifact()

169 170 171 172 173 174 175 176 177 178 179 180
    system_configuration.code_page = self.GetValue(
        'codepage', default_value=self._codepage)

    system_configuration.hostname = self._hostnames.get(
        session_identifier, None)

    system_configuration.keyboard_layout = self.GetValue('keyboard_layout')
    system_configuration.operating_system = self.GetValue('operating_system')
    system_configuration.operating_system_product = self.GetValue(
        'operating_system_product')
    system_configuration.operating_system_version = self.GetValue(
        'operating_system_version')
181

182 183
    date_time = datetime.datetime(2017, 1, 1)
    time_zone = self._time_zone.tzname(date_time)
184

185 186
    if time_zone and isinstance(time_zone, py2to3.BYTES_TYPE):
      time_zone = time_zone.decode('ascii')
187

188 189 190 191 192 193
    system_configuration.time_zone = time_zone

    user_accounts = self._user_accounts.get(session_identifier, {})
    # In Python 3 dict.values() returns a type dict_values, which will cause
    # the JSON serializer to raise a TypeError.
    system_configuration.user_accounts = list(user_accounts.values())
194 195 196

    return system_configuration

197 198 199
  def GetUsernameByIdentifier(
      self, user_identifier, session_identifier=CURRENT_SESSION):
    """Retrieves the username based on an user identifier.
200 201

    Args:
202 203 204
      user_identifier (str): user identifier, either a UID or SID.
      session_identifier (Optional[str])): session identifier, where
          CURRENT_SESSION represents the active session.
205 206 207 208

    Returns:
      str: username.
    """
209
    user_accounts = self._user_accounts.get(session_identifier, {})
210
    user_account = user_accounts.get(user_identifier, None)
211 212
    if not user_account:
      return ''
213

214
    return user_account.username or ''
215 216 217 218 219 220 221 222 223

  def GetUsernameForPath(self, path):
    """Retrieves a username for a specific path.

    This is determining if a specific path is within a user's directory and
    returning the username of the user if so.

    Args:
      path (str): path.
224 225

    Returns:
226 227
      str: username or None if the path does not appear to be within a user's
          directory.
228
    """
229 230
    path = path.lower()

231
    user_accounts = self._user_accounts.get(self.CURRENT_SESSION, {})
232 233 234
    for user_account in iter(user_accounts.values()):
      if not user_account.user_directory:
        continue
235

236 237 238
      user_directory = user_account.user_directory.lower()
      if path.startswith(user_directory):
        return user_account.username
239

240 241
    return None

242 243 244 245
  def GetValue(self, identifier, default_value=None):
    """Retrieves a value by identifier.

    Args:
246 247
      identifier (str): case insensitive unique identifier for the value.
      default_value (object): default value.
248 249

    Returns:
250 251 252 253 254 255
      object: value or default value if not available.

    Raises:
      TypeError: if the identifier is not a string type.
    """
    if not isinstance(identifier, py2to3.STRING_TYPES):
256
      raise TypeError('Identifier not a string type.')
257 258 259 260

    identifier = identifier.lower()
    return self._values.get(identifier, default_value)

261 262 263 264 265 266 267 268 269 270
  def HasUserAccounts(self):
    """Determines if the knowledge base contains user accounts.

    Returns:
      bool: True if the knowledge base contains user accounts.
    """
    return self._user_accounts.get(self.CURRENT_SESSION, {}) != {}

  def ReadSystemConfigurationArtifact(
      self, system_configuration, session_identifier=CURRENT_SESSION):
271 272 273 274 275 276 277
    """Reads the knowledge base values from a system configuration artifact.

    Note that this overwrites existing values in the knowledge base.

    Args:
      system_configuration (SystemConfigurationArtifact): system configuration
          artifact.
278 279
      session_identifier (Optional[str])): session identifier, where
          CURRENT_SESSION represents the active session.
280
    """
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
    if system_configuration.code_page:
      try:
        self.SetCodepage(system_configuration.code_page)
      except ValueError:
        logger.warning(
            'Unsupported codepage: {0:s}, defaulting to {1:s}'.format(
                system_configuration.code_page, self._codepage))

    self._hostnames[session_identifier] = system_configuration.hostname

    self.SetValue('keyboard_layout', system_configuration.keyboard_layout)

    self.SetValue('operating_system', system_configuration.operating_system)
    self.SetValue(
        'operating_system_product',
        system_configuration.operating_system_product)
    self.SetValue(
        'operating_system_version',
        system_configuration.operating_system_version)

    if system_configuration.time_zone:
      try:
        self.SetTimeZone(system_configuration.time_zone)
      except ValueError:
        logger.warning(
            'Unsupported time zone: {0:s}, defaulting to {1:s}'.format(
                system_configuration.time_zone, self.timezone.zone))

    self._user_accounts[session_identifier] = {
310 311 312
        user_account.username: user_account
        for user_account in system_configuration.user_accounts}

313 314
  def SetCodepage(self, codepage):
    """Sets the codepage.
315 316

    Args:
317
      codepage (str): codepage.
318 319

    Raises:
320
      ValueError: if the codepage is not supported.
321
    """
322
    try:
323 324 325 326
      codecs.getencoder(codepage)
      self._codepage = codepage
    except LookupError:
      raise ValueError('Unsupported codepage: {0:s}'.format(codepage))
327 328 329 330 331 332 333 334 335 336 337

  def SetEnvironmentVariable(self, environment_variable):
    """Sets an environment variable.

    Args:
      environment_variable (EnvironmentVariableArtifact): environment variable
          artifact.
    """
    name = environment_variable.name.upper()
    self._environment_variables[name] = environment_variable

338
  def SetHostname(self, hostname, session_identifier=CURRENT_SESSION):
339 340 341 342
    """Sets a hostname.

    Args:
      hostname (HostnameArtifact): hostname artifact.
343 344
      session_identifier (Optional[str])): session identifier, where
          CURRENT_SESSION represents the active session.
345
    """
346
    self._hostnames[session_identifier] = hostname
347

348 349
  def SetTimeZone(self, time_zone):
    """Sets the time zone.
350 351

    Args:
352
      time_zone (str): time zone.
353

354 355 356 357 358 359 360
    Raises:
      ValueError: if the timezone is not supported.
    """
    try:
      self._time_zone = pytz.timezone(time_zone)
    except (AttributeError, pytz.UnknownTimeZoneError):
      raise ValueError('Unsupported timezone: {0!s}'.format(time_zone))
361 362 363 364 365

  def SetValue(self, identifier, value):
    """Sets a value by identifier.

    Args:
366 367 368 369 370
      identifier (str): case insensitive unique identifier for the value.
      value (object): value.

    Raises:
      TypeError: if the identifier is not a string type.
371
    """
372
    if not isinstance(identifier, py2to3.STRING_TYPES):
373
      raise TypeError('Identifier not a string type.')
374 375 376

    identifier = identifier.lower()
    self._values[identifier] = value