| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- # IRIS Source Code
- # Copyright (C) 2022 - DFIR IRIS Team
- # contact@dfir-iris.org
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- # TODO should probably dispatch the methods provided in this file in the different namespaces
- import base64
- import datetime
- import hashlib
- import logging as log
- import marshmallow
- import shutil
- import weakref
- from cryptography.exceptions import InvalidSignature
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.primitives import hmac
- from flask_login import current_user
- from pathlib import Path
- from pyunpack import Archive
- from sqlalchemy.orm.attributes import flag_modified
- from app import app
- from app import db
- class FileRemover(object):
- def __init__(self):
- self.weak_references = dict() # weak_ref -> filepath to remove
- def cleanup_once_done(self, response_d, filepath):
- wr = weakref.ref(response_d, self._do_cleanup)
- self.weak_references[wr] = filepath
- def _do_cleanup(self, wr):
- filepath = self.weak_references[wr]
- shutil.rmtree(filepath, ignore_errors=True)
- def decompress_7z(filename: Path, output_dir):
- """
- Decompress a 7z file in specified output directory
- :param filename: Filename to decompress
- :param output_dir: Target output dir
- :return: True if uncompress
- """
- try:
- a = Archive(filename=filename)
- a.extractall(directory=output_dir, auto_create_dir=True)
- except Exception as e:
- log.warning(e)
- return False
- return True
- def add_obj_history_entry(obj, action, commit=False):
- if hasattr(obj, 'modification_history'):
- if isinstance(obj.modification_history, dict):
- obj.modification_history.update({
- datetime.datetime.now().timestamp(): {
- 'user': current_user.user,
- 'user_id': current_user.id,
- 'action': action
- }
- })
- else:
- obj.modification_history = {
- datetime.datetime.now().timestamp(): {
- 'user': current_user.user,
- 'user_id': current_user.id,
- 'action': action
- }
- }
- flag_modified(obj, "modification_history")
- if commit:
- db.session.commit()
- return obj
- def file_sha256sum(file_path):
- if not Path(file_path).is_file():
- return None
- sha256_hash = hashlib.sha256()
- with open(file_path, "rb") as f:
- # Read and update hash string value in blocks of 4K
- for byte_block in iter(lambda: f.read(4096), b""):
- sha256_hash.update(byte_block)
- return sha256_hash.hexdigest().upper()
- def stream_sha256sum(stream):
- return hashlib.sha256(stream).hexdigest().upper()
- @app.template_filter()
- def format_datetime(value, frmt):
- return datetime.datetime.fromtimestamp(float(value)).strftime(frmt)
- def hmac_sign(data):
- key = bytes(app.config.get("SECRET_KEY"), "utf-8")
- h = hmac.HMAC(key, hashes.SHA256())
- h.update(data)
- signature = base64.b64encode(h.finalize())
- return signature
- def hmac_verify(signature_enc, data):
- signature = base64.b64decode(signature_enc)
- key = bytes(app.config.get("SECRET_KEY"), "utf-8")
- h = hmac.HMAC(key, hashes.SHA256())
- h.update(data)
- try:
- h.verify(signature)
- return True
- except InvalidSignature:
- return False
- def str_to_bool(value):
- if value is None:
- return False
- if isinstance(value, bool):
- return value
- if isinstance(value, int):
- return bool(value)
- return value.lower() in ['true', '1', 'yes', 'y', 't']
- def assert_type_mml(input_var: any, field_name: str, type: type, allow_none: bool = False,
- max_len: int = None, max_val: int = None, min_val: int = None):
- if input_var is None:
- if allow_none is False:
- raise marshmallow.ValidationError("Invalid data - non null expected",
- field_name=field_name if field_name else "type")
- else:
- return True
-
- if isinstance(input_var, type):
- if max_len:
- if len(input_var) > max_len:
- raise marshmallow.ValidationError("Invalid data - max length exceeded",
- field_name=field_name if field_name else "type")
- if max_val:
- if input_var > max_val:
- raise marshmallow.ValidationError("Invalid data - max value exceeded",
- field_name=field_name if field_name else "type")
- if min_val:
- if input_var < min_val:
- raise marshmallow.ValidationError("Invalid data - min value exceeded",
- field_name=field_name if field_name else "type")
- return True
-
- try:
- if isinstance(type(input_var), type):
- return True
- except Exception as e:
- log.error(e)
- print(e)
-
- raise marshmallow.ValidationError("Invalid data type",
- field_name=field_name if field_name else "type")
|