Skip to content

Home

Modern MISPAPIWorkerFrontendMySqlRedis
Modern MISPAPIWorkerFrontendMySqlRedis

api-worker-integration

log = logging.getLogger(__name__) module-attribute

Encapsulates the logic of the API for the worker router

get_job_count(name) async

Returns the number of jobs in the specified worker queue :param name: Contains the name of the worker :type name: WorkerEnum :return: The amount of jobs in the worker queue :rtype: int

Source code in src/mmisp/worker/controller/worker_controller.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def get_job_count(name: WorkerEnum) -> int:
    """
    Returns the number of jobs in the specified worker queue
    :param name: Contains the name of the worker
    :type name: WorkerEnum
    :return: The amount of jobs in the worker queue
    :rtype: int
    """

    job_count: int = 0

    # _TaskInfo is not defined in the celery package
    reserved_tasks: dict[str, list[dict]] = celery_app.control.inspect().reserved()  # type: ignore
    worker_name: str = f"{name.value}@{platform.node()}"

    if reserved_tasks and worker_name in reserved_tasks:
        job_count += len(reserved_tasks[worker_name])

    job_count += await MMispRedis().get_enqueued_celery_tasks(name)
    return job_count

is_worker_active(name)

Checks if the specified worker is active :param name: Contains the name of the worker :type name: WorkerEnum :return: True if the worker active, else False :rtype: bool

Source code in src/mmisp/worker/controller/worker_controller.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def is_worker_active(name: WorkerEnum) -> bool:
    """
    Checks if the specified worker is active
    :param name: Contains the name of the worker
    :type name: WorkerEnum
    :return: True if the worker active, else False
    :rtype: bool

    """
    # _TaskInfo is not defined in the celery package
    report: dict[str, list[dict]] = celery_app.control.inspect().active()  # type: ignore

    if report:
        return bool(report.get(f"{name.value}@{platform.node()}"))
    return False

__convert_celery_task_state(job_state)

Converts a celery task state to a job status enum. :param job_state: The state of the job. :type job_state: str :return: returns a value of the job status enum :rtype: JobStatusEnum

Source code in src/mmisp/worker/controller/job_controller.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __convert_celery_task_state(job_state: str) -> JobStatusEnum:
    """
    Converts a celery task state to a job status enum.
    :param job_state: The state of the job.
    :type job_state: str
    :return: returns a value of the job status enum
    :rtype: JobStatusEnum
    """
    state_map: dict[str, JobStatusEnum] = {
        states.PENDING: JobStatusEnum.QUEUED,
        JOB_CREATED_STATE: JobStatusEnum.QUEUED,
        states.RETRY: JobStatusEnum.QUEUED,
        states.STARTED: JobStatusEnum.IN_PROGRESS,
        states.SUCCESS: JobStatusEnum.SUCCESS,
        states.FAILURE: JobStatusEnum.FAILED,
        states.REVOKED: JobStatusEnum.REVOKED,
    }

    return state_map[job_state]

cancel_job(job_id)

Revokes a given job. :param job_id: The ID of the job :type job_id: str :return: Whether the revoke action was successful. :rtype: bool

Source code in src/mmisp/worker/controller/job_controller.py
54
55
56
57
58
59
60
61
62
63
def cancel_job(job_id: str) -> bool:
    """
    Revokes a given job.
    :param job_id: The ID of the job
    :type job_id: str
    :return: Whether the revoke action was successful.
    :rtype: bool
    """
    celery_app.control.revoke(job_id)
    return True

create_job(job, *args, **kwargs)

Enqueues a given celery task.

:param job: The celery Task to enqueue :type job: celery.Task :param args: Arguments passed to the job. :param kwargs: Arguments passed to the job. :return: The job_id of the created job and a success status. :rtype: CreateJobResponse

Source code in src/mmisp/worker/controller/job_controller.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def create_job(job: Task, *args, **kwargs) -> CreateJobResponse:
    """
    Enqueues a given celery task.

    :param job: The celery Task to enqueue
    :type job: celery.Task
    :param args: Arguments passed to the job.
    :param kwargs: Arguments passed to the job.
    :return: The job_id of the created job and a success status.
    :rtype: CreateJobResponse
    """
    try:
        result: AsyncResult = job.delay(*args, **kwargs)

    except OperationalError:
        return CreateJobResponse(job_id=None, success=False)

    return CreateJobResponse(job_id=result.id, success=True)

get_job_result(job_id)

Returns the result of the specified job :param job_id: is the id of the job :type job_id: str :return: a special ResponseData depending on the job :rtype: ResponseData

Source code in src/mmisp/worker/controller/job_controller.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def get_job_result(job_id: str) -> Any:
    """
    Returns the result of the specified job
    :param job_id: is the id of the job
    :type job_id: str
    :return: a special ResponseData depending on the job
    :rtype: ResponseData
    """
    async_result = celery_app.AsyncResult(job_id)
    if async_result.state == states.PENDING:
        raise NotExistentJobException

    if not async_result.ready():
        raise JobNotFinishedException

    # celery_app.AsyncResult(job_id).result is annotated as Any | Exception, but it can be only ResponseData or
    # Exception
    try:
        result = async_result.get()
        return result
    except Exception:
        assert async_result.traceback is not None
        return ExceptionResponse(message=str(async_result.traceback))

get_job_status(job_id)

Returns the status of the given job.

:param job_id: The ID of the job. :type job_id: str :return: The status of the job. :rtype: JobStatusEnum :raises NotExistentJobException: If there is no job with the specified ID.

Source code in src/mmisp/worker/controller/job_controller.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_job_status(job_id: str) -> JobStatusEnum:
    """
    Returns the status of the given job.

    :param job_id: The ID of the job.
    :type job_id: str
    :return: The status of the job.
    :rtype: JobStatusEnum
    :raises NotExistentJobException: If there is no job with the specified ID.
    """
    celery_state: str = celery_app.AsyncResult(job_id).state

    if celery_state == states.PENDING:
        raise NotExistentJobException(job_id=job_id)
    return __convert_celery_task_state(celery_state)

Module implements the Celery Application.

JOB_CREATED_STATE: str = 'ENQUEUED' module-attribute

Custom Celery task state for enqueued tasks.

celery_app: Celery = Celery(backend=CeleryConfig.result_backend, broker=CeleryConfig.broker_url) module-attribute

The celery instance

CeleryConfig

Encapsulates configuration for Celery.

Source code in src/mmisp/worker/controller/celery_client.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class CeleryConfig:
    """
    Encapsulates configuration for Celery.
    """

    broker_url: str = os.environ.get(
        "CELERY_BROKER_URL",
        (
            f"redis://"
            f"{':' + mmisp_redis_config_data.password + '@' if mmisp_redis_config_data.password else ''}"
            f"{mmisp_redis_config_data.host}:{mmisp_redis_config_data.port}/{mmisp_redis_config_data.db}"
        ),
    )
    result_backend: str = os.environ.get("CELERY_RESULT_BACKEND", broker_url)
    redis_username: str = os.environ.get("CELERY_REDIS_USERNAME", mmisp_redis_config_data.username)
    redis_password: str = os.environ.get("CELERY_REDIS_PASSWORD", mmisp_redis_config_data.password)
    task_routes: dict = {
        "mmisp.worker.jobs.correlation.*": {"queue": WorkerEnum.CORRELATE.value},
        "mmisp.worker.jobs.enrichment.*": {"queue": WorkerEnum.ENRICHMENT.value},
        "mmisp.worker.jobs.email.*": {"queue": WorkerEnum.SEND_EMAIL.value},
        "mmisp.worker.jobs.processfreetext.*": {"queue": WorkerEnum.PROCESS_FREE_TEXT.value},
    }
    imports: list[str] = [
        "mmisp.worker.jobs.enrichment.enrich_attribute_job",
        "mmisp.worker.jobs.enrichment.enrich_event_job",
        "mmisp.worker.jobs.correlation.clean_excluded_correlations_job",
        "mmisp.worker.jobs.correlation.correlate_value_job",
        "mmisp.worker.jobs.correlation.correlation_plugin_job",
        "mmisp.worker.jobs.correlation.regenerate_occurrences_job",
        "mmisp.worker.jobs.correlation.top_correlations_job",
        "mmisp.worker.jobs.email.alert_email_job",
        "mmisp.worker.jobs.email.contact_email_job",
        "mmisp.worker.jobs.email.posts_email_job",
        "mmisp.worker.jobs.processfreetext.processfreetext_job",
    ]
    task_track_started = True
    task_serializer = "pickle"
    result_serializer = "pickle"
    event_serializer = "pickle"
    accept_content = ["pickle"]

update_sent_state(sender=None, headers=None, **kwargs)

Function sets a custom task state for enqueued tasks. :param sender: The name of the task to update its state. :type sender: celery.Task :param headers: The task message headers :type headers: dict :param kwargs: Not needed

Source code in src/mmisp/worker/controller/celery_client.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@before_task_publish.connect
def update_sent_state(sender: Task | str | None = None, headers: dict | None = None, **kwargs) -> None:
    """
    Function sets a custom task state for enqueued tasks.
    :param sender: The name of the task to update its state.
    :type sender: celery.Task
    :param headers: The task message headers
    :type headers: dict
    :param kwargs: Not needed
    """
    if headers is None:
        raise ValueError("invalid headers")

    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    backend: Any
    if sender is None:
        backend = celery_app.backend
    else:
        if isinstance(sender, str):
            task = celery_app.tasks.get(sender)
        else:
            task = celery_app.tasks.get(sender.name)
        backend = task.backend if task else celery_app.backend
    backend.store_result(headers["id"], None, JOB_CREATED_STATE)

ENV_PREFIX: str = 'MMISP' module-attribute

Prefix for the configuration environment variables.

ConfigData

Bases: BaseModel

Base class for configuration data.

Source code in src/mmisp/worker/config.py
28
29
30
31
32
33
class ConfigData(BaseModel):
    """
    Base class for configuration data.
    """

    pass

SystemConfigData

Bases: ConfigData

Encapsulates the general configuration of the MMISP Worker application.

Source code in src/mmisp/worker/config.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class SystemConfigData(ConfigData):
    """
    Encapsulates the general configuration of the MMISP Worker application.
    """

    api_port: PositiveInt = 5000
    """The port exposing the API."""

    api_key: str = ""
    """The key for the API."""

    api_host: str = "0.0.0.0"
    """The host the API binds to."""

    autostart_correlation_worker: bool = False
    """If True, the correlation worker will be started automatically at application start."""
    autostart_email_worker: bool = False
    """If True, the email worker will be started automatically at application start."""
    autostart_enrichment_worker: bool = False
    """If True, the enrichment worker will be started automatically at application start."""
    autostart_processfreetext_worker: bool = False
    """If True, the process free text worker will be started automatically at application start."""
    autostart_pull_worker: bool = False
    """If True, the pull worker will be started automatically at application start."""
    autostart_push_worker: bool = False
    """If True, the push worker will be started automatically at application start."""
    worker_termination_timeout: int = 30
    """The time in seconds to wait for the worker to terminate before kill."""

    def is_autostart_for_worker_enabled(self: Self, worker: WorkerEnum) -> Any:
        """
        Returns the autostart configuration for the specified worker.
        :param worker: The worker to check the autostart configuration for.
        """

        worker_config_map: dict[WorkerEnum, str] = {
            WorkerEnum.PULL: "autostart_pull_worker",
            WorkerEnum.PUSH: "autostart_push_worker",
            WorkerEnum.CORRELATE: "autostart_correlation_worker",
            WorkerEnum.ENRICHMENT: "autostart_enrichment_worker",
            WorkerEnum.SEND_EMAIL: "autostart_email_worker",
            WorkerEnum.PROCESS_FREE_TEXT: "autostart_processfreetext_worker",
        }

        return getattr(self, worker_config_map[worker])

api_host: str = '0.0.0.0' class-attribute instance-attribute

The host the API binds to.

api_key: str = '' class-attribute instance-attribute

The key for the API.

api_port: PositiveInt = 5000 class-attribute instance-attribute

The port exposing the API.

autostart_correlation_worker: bool = False class-attribute instance-attribute

If True, the correlation worker will be started automatically at application start.

autostart_email_worker: bool = False class-attribute instance-attribute

If True, the email worker will be started automatically at application start.

autostart_enrichment_worker: bool = False class-attribute instance-attribute

If True, the enrichment worker will be started automatically at application start.

autostart_processfreetext_worker: bool = False class-attribute instance-attribute

If True, the process free text worker will be started automatically at application start.

autostart_pull_worker: bool = False class-attribute instance-attribute

If True, the pull worker will be started automatically at application start.

autostart_push_worker: bool = False class-attribute instance-attribute

If True, the push worker will be started automatically at application start.

worker_termination_timeout: int = 30 class-attribute instance-attribute

The time in seconds to wait for the worker to terminate before kill.

is_autostart_for_worker_enabled(worker)

Returns the autostart configuration for the specified worker. :param worker: The worker to check the autostart configuration for.

Source code in src/mmisp/worker/config.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def is_autostart_for_worker_enabled(self: Self, worker: WorkerEnum) -> Any:
    """
    Returns the autostart configuration for the specified worker.
    :param worker: The worker to check the autostart configuration for.
    """

    worker_config_map: dict[WorkerEnum, str] = {
        WorkerEnum.PULL: "autostart_pull_worker",
        WorkerEnum.PUSH: "autostart_push_worker",
        WorkerEnum.CORRELATE: "autostart_correlation_worker",
        WorkerEnum.ENRICHMENT: "autostart_enrichment_worker",
        WorkerEnum.SEND_EMAIL: "autostart_email_worker",
        WorkerEnum.PROCESS_FREE_TEXT: "autostart_processfreetext_worker",
    }

    return getattr(self, worker_config_map[worker])

read_from_env()

Reads the configuration from the environment.

Source code in src/mmisp/worker/config.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def read_from_env() -> dict:
    """
    Reads the configuration from the environment.
    """

    env_dict: dict[
        str,
        str,
    ] = {
        "api_port": ENV_API_PORT,
        "api_key": ENV_API_KEY,
        "api_host": ENV_API_HOST,
        "autostart_correlation_worker": ENV_AUTOSTART_CORRELATION_WORKER,
        "autostart_email_worker": ENV_AUTOSTART_EMAIL_WORKER,
        "autostart_enrichment_worker": ENV_AUTOSTART_ENRICHMENT_WORKER,
        "autostart_exception_worker": ENV_AUTOSTART_EXCEPTION_WORKER,
        "autostart_processfreetext_worker": ENV_AUTOSTART_PROCESSFREETEXT_WORKER,
        "autostart_pull_worker": ENV_AUTOSTART_PULL_WORKER,
        "autostart_push_worker": ENV_AUTOSTART_PUSH_WORKER,
        "worker_termination_timeout": ENV_WORKER_TERMINATION_TIMEOUT,
    }
    return {
        key: value for key, env_variable in env_dict.items() if (value := os.getenv(env_variable, None)) is not None
    }

ENV_REDIS_DB: str = f'{ENV_PREFIX}_REDIS_DB' module-attribute

The environment variable name for the Redis database name.

ENV_REDIS_HOST: str = f'{ENV_PREFIX}_REDIS_HOST' module-attribute

The environment variable name for the Redis host.

ENV_REDIS_PASSWORD: str = f'{ENV_PREFIX}_REDIS_PASSWORD' module-attribute

The environment variable name for the Redis password.

ENV_REDIS_PORT: str = f'{ENV_PREFIX}_REDIS_PORT' module-attribute

The environment variable name for the Redis port.

ENV_REDIS_USERNAME: str = f'{ENV_PREFIX}_REDIS_USERNAME' module-attribute

The environment variable name for the Redis username.

MMispRedisConfigData

Bases: ConfigData

Encapsulates configuration data for the Redis connection.

Source code in src/mmisp/worker/misp_database/mmisp_redis_config.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class MMispRedisConfigData(ConfigData):
    """
    Encapsulates configuration data for the Redis connection.
    """

    host: str = "localhost"
    """The host of the Redis database."""
    port: int = 6379
    """The port of the Redis database."""
    db: int = 0
    """The database name of the Redis database."""
    username: str = ""
    """The username of the Redis database."""
    password: str = ""
    """The password of the Redis database."""

    def __init__(self: Self) -> None:
        super().__init__()
        self.read_from_env()

    def read_from_env(self: Self) -> None:
        """
        Reads the configuration from the environment.
        """

        env_dict: dict = {
            "host": ENV_REDIS_HOST,
            "port": ENV_REDIS_PORT,
            "db": ENV_REDIS_DB,
            "username": ENV_REDIS_USERNAME,
            "password": ENV_REDIS_PASSWORD,
        }

        for env in env_dict:
            value: str | None = os.environ.get(env_dict[env])
            if value:
                try:
                    setattr(self, env, value)
                except ValidationError as validation_error:
                    _log.exception(
                        f"{env_dict[env]}: Could not set value from environment variable. {validation_error}"
                    )

db: int = 0 class-attribute instance-attribute

The database name of the Redis database.

host: str = 'localhost' class-attribute instance-attribute

The host of the Redis database.

password: str = '' class-attribute instance-attribute

The password of the Redis database.

port: int = 6379 class-attribute instance-attribute

The port of the Redis database.

username: str = '' class-attribute instance-attribute

The username of the Redis database.

read_from_env()

Reads the configuration from the environment.

Source code in src/mmisp/worker/misp_database/mmisp_redis_config.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def read_from_env(self: Self) -> None:
    """
    Reads the configuration from the environment.
    """

    env_dict: dict = {
        "host": ENV_REDIS_HOST,
        "port": ENV_REDIS_PORT,
        "db": ENV_REDIS_DB,
        "username": ENV_REDIS_USERNAME,
        "password": ENV_REDIS_PASSWORD,
    }

    for env in env_dict:
        value: str | None = os.environ.get(env_dict[env])
        if value:
            try:
                setattr(self, env, value)
            except ValidationError as validation_error:
                _log.exception(
                    f"{env_dict[env]}: Could not set value from environment variable. {validation_error}"
                )

decode_json_response(response)

Decodes the JSON response from the MISP API

:param response: response from the MISP API :type response: Response :return: returns the decoded JSON response :rtype: dict

Source code in src/mmisp/worker/misp_database/misp_api_utils.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def decode_json_response(response: Response) -> dict:
    """
    Decodes the JSON response from the MISP API

    :param response: response from the MISP API
    :type response: Response
    :return: returns the decoded JSON response
    :rtype: dict
    """
    response_dict: dict
    try:
        response_dict = response.json()
    except JSONDecodeError as json_error:
        print(response.text)
        raise InvalidAPIResponse(f"Invalid API response: {json_error}")

    return response_dict

translate_dictionary(dictionary, translation_dict)

translates the keys of a dictionary according to the translation dictionary

:param dictionary: dictionary to be translated :type dictionary: dict :param translation_dict: translation dictionary including the old key as the key and the new key as the value :type translation_dict: dict[str, str] :return: returns the translated dictionary :rtype: dict

Source code in src/mmisp/worker/misp_database/misp_api_utils.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def translate_dictionary(dictionary: dict, translation_dict: dict[str, str]) -> dict:
    """
    translates the keys of a dictionary according to the translation dictionary

    :param dictionary: dictionary to be translated
    :type dictionary: dict
    :param translation_dict: translation dictionary including the old key as the key and the new key as the value
    :type translation_dict: dict[str, str]
    :return: returns the translated dictionary
    :rtype: dict
    """
    translated_dict: dict = {}
    for key in dictionary.keys():
        if key in translation_dict:
            new_key: str = translation_dict[key]
            translated_dict[new_key] = dictionary[key]
        else:
            translated_dict[key] = dictionary[key]

    return translated_dict

MispAPI

This class is used to communicate with the MISP API.

it encapsulates the communication with the MISP API and provides methods to retrieve and send data. the data is parsed and validated by the MispAPIParser and MispAPIUtils classes, and returns the data as MMISP dataclasses.

Source code in src/mmisp/worker/misp_database/misp_api.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
class MispAPI:
    """
    This class is used to communicate with the MISP API.

    it encapsulates the communication with the MISP API and provides methods to retrieve and send data.
    the data is parsed and validated by the MispAPIParser and MispAPIUtils classes,
    and returns the data as MMISP dataclasses.
    """

    __HEADERS: dict = {"Accept": "application/json", "Content-Type": "application/json", "Authorization": ""}
    __LIMIT: int = 1000

    def __init__(self: Self, db: AsyncSession) -> None:
        self.__config: MispAPIConfigData = misp_api_config_data
        self._db = db

    def __setup_api_session(self: Self) -> Session:
        """
        This method is used to set up the session for the API.

        :return:  returns the session that was set up
        :rtype: Session
        """

        session = Session()
        session.headers.update(self.__HEADERS)
        session.headers.update({"Authorization": f"{self.__config.key}"})
        return session

    async def __setup_remote_api_session(self: Self, server_id: int) -> Session:
        """
        This method is used to set up the session for the remote API.

        :param server_id: server id of the remote server to set up the session for
        :type server_id: int
        :return: returns the session to the specified server that was set up
        :rtype: Session
        """

        key: str | None = await get_api_authkey(self._db, server_id)
        if key is None:
            raise APIException(f"API key for server {server_id} is not available.")

        session = Session()
        session.headers.update(self.__HEADERS)
        session.headers.update({"Authorization": f"{key}"})
        return session

    async def __get_session(self: Self, server: Server | None = None) -> Session:
        """
        This method is used to get the session for the given server_id
        if a session for the given server_id already exists, it returns the existing session,
        otherwise it sets up a new session and returns it.

        :param server: server to get the session for, if no server is given, the own API is used
        :type server: Server
        :return: returns a session to the specified server
        :rtype: Session
        """

        server_id: int = server.id if server is not None else 0
        if server_id == 0:
            return self.__setup_api_session()
        else:
            return await self.__setup_remote_api_session(server_id)

    def __get_url(self: Self, path: str, server: Server | None = None) -> str:
        """
        This method is used to get the url for the given server, adding the given path to the url.

        if no server is given, it uses the default url from the config,
        otherwise it uses the url of the given server.

        :param path: path to add to the url
        :type path: str
        :param server: remote server to get the url for
        :type server: Server
        :return: returns the url for the given server with the path added
        :rtype: str
        """
        url: str
        if server:
            url = server.url
        else:
            if self.__config.url.endswith("/"):
                url = self.__config.url[:-1]
            else:
                url = self.__config.url

        return self.__join_path(url, path)

    @staticmethod
    def __join_path(url: str, path: str) -> str:
        """
        This method is used to join the given path to the given url.
        it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

        :param url: url to join the path to
        :type url: str
        :param path: path to join to the url
        :type path: str
        :return: returns the url with the path added
        :rtype: str
        """

        if path.startswith("/"):
            return url + path
        else:
            return f"{url}/{path}"

    async def __send_request(self: Self, request: PreparedRequest, server: Server | None = None, **kwargs) -> dict:
        """
        This method is used to send the given request and return the response.

        :param request: the request to send
        :type request: PreparedRequest
        :param kwargs: keyword arguments
        :type kwargs: dict[str, Any]
        :return: returns the response of the request
        :rtype: dict
        """
        print("Request is: ", request)
        print(request.method)
        print(request.url)
        if request.method == "POST":
            print(request.body)
        response: Response

        if "timeout" not in kwargs:
            kwargs["timeout"] = (self.__config.connect_timeout, self.__config.read_timeout)

        try:
            response = (await self.__get_session(server)).send(request, **kwargs)
        except (ConnectionError, TimeoutError, TooManyRedirects) as api_exception:
            _log.warning(f"API not available. The request could not be made. ==> {api_exception}")
            raise APIException(f"API not available. The request could not be made. ==> {api_exception}")

        try:
            response.raise_for_status()
        except requests.HTTPError as http_err:
            # Füge hier eine detaillierte Fehlerausgabe hinzu
            error_details = (
                f"HTTP Error occurred: {http_err}\n"
                f"URL: {request.url}\n"
                f"Status Code: {response.status_code}\n"
                f"Response Text: {response.text}\n"
                f"Headers: {response.headers}"
            )
            _log.error(error_details)
            raise APIException(error_details) from http_err

        if response.status_code != codes.ok:
            raise requests.HTTPError(response, response.text)

        return misp_api_utils.decode_json_response(response)

    async def get_user(self: Self, user_id: int, server: Server | None = None) -> MispUser:
        """
        Returns the user with the given user_id.

        :param user_id: id of the user
        :type user_id: int
        :param server: the server to get the user from, if no server is given, the own API is used
        :type server: Server
        :return: returns the user with the given user_id
        :rtype: MispUser
        """
        url: str = self.__get_url(f"/admin/users/view/{user_id}", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        get_user_element_responds: GetUsersElement = GetUsersElement.parse_obj(response)
        user_dict: dict = get_user_element_responds.User.dict()
        user_dict["role"] = get_user_element_responds.Role.dict()

        try:
            return MispUser.parse_obj(user_dict)
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. MISP user could not be parsed: {value_error}")

    async def get_object(self: Self, object_id: int, server: Server | None = None) -> ObjectWithAttributesResponse:
        """
        Returns the object with the given object_id.

        :param object_id:  id of the object
        :type object_id: int
        :param server: the server to get the object from, if no server is given, the own API is used
        :type server: Server
        :return: The object
        :rtype: ObjectWithAttributesResponse
        """
        if object_id == 0:
            #  for correlation to give back an empty object
            return ObjectWithAttributesResponse(id=0, uuid="", name="", distribution=4, sharing_group_id=0)

        url: str = self.__get_url(f"objects/view/{object_id}", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)

        try:
            return ObjectResponse.parse_obj(response).Object
        except ValueError as value_error:
            raise InvalidAPIResponse(
                f"Invalid API response. MISP ObjectWithAttributesResponse could not be parsed: {value_error}"
            )

    async def get_sharing_group(
        self: Self, sharing_group_id: int, server: Server | None = None
    ) -> ViewUpdateSharingGroupLegacyResponse:
        """
        Returns the sharing group with the given sharing_group_id

        :param sharing_group_id: id of the sharing group to get from the API
        :type sharing_group_id: int
        :param server: the server to get the sharing group from, if no server is given, the own API is used
        :type server: Server
        :return: returns the sharing group that got requested
        :rtype: ViewUpdateSharingGroupLegacyResponse
        """

        url: str = self.__get_url(f"/sharing_groups/view/{sharing_group_id}", server)
        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        try:
            return ViewUpdateSharingGroupLegacyResponse.parse_obj(response)
        except ValueError as value_error:
            raise InvalidAPIResponse(
                f"Invalid API response. MISP ViewUpdateSharingGroupLegacyResponse could not be parsed: {value_error}"
            )

    async def get_event(self: Self, event_id: int | UUID, server: Server | None = None) -> AddEditGetEventDetails:
        """
        Returns the event with the given event_id from the given server,
         the own API is used if no server is given.

        :param event_id: the id of the event to get
        :type event_id: int
        :param server: the server to get the event from, if no server is given, the own API is used
        :type server: Server
        :return: returns the event with the given event_id from the given server
        :rtype: AddEditGetEventDetails
        """
        url: str = self.__get_url(f"/events/view/{event_id}", server)
        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        try:
            return AddEditGetEventDetails.parse_obj(response["Event"])
        except ValueError as value_error:
            raise InvalidAPIResponse(
                f"Invalid API response. AddEditGetEventDetails"
                f"{json.dumps(response['Event'])} could not be parsed: {value_error}"
            )

    async def get_sharing_groups(
        self: Self, server: Server | None = None
    ) -> list[GetAllSharingGroupsResponseResponseItem]:
        """
        Returns all sharing groups from the given server, if no server is given, the own API is used.

        :param server: the server to get the sharing groups from, if no server is given, the own API is used
        :type server: Server
        :return: returns all sharing groups from the given server
        :rtype: list[GetAllSharingGroupsResponseResponseItem]
        """
        url: str = self.__get_url("/sharing_groups", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        print(f"get_sharing_groups: response={response}")

        try:
            return GetAllSharingGroupsResponse.parse_obj(response).response
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. MISP Sharing Group could not be parsed: {value_error}")

    async def get_attribute(self: Self, attribute_id: int, server: Server | None = None) -> GetAttributeAttributes:
        """
        Returns the attribute with the given attribute_id.

        :param attribute_id: the id of the attribute to get
        :type attribute_id: int
        :param server: the server to get the attribute from, if no server is given, the own API is used
        :type server: Server
        :return: returns the attribute with the given attribute_id
        :rtype: GetAttributeAttributes
        """

        url: str = self.__get_url(f"/attributes/{attribute_id}", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)

        try:
            return GetAttributeResponse.parse_obj(response).Attribute
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. MISP Attribute could not be parsed: {value_error}")

    async def get_event_attributes(
        self: Self, event_id: int, server: Server | None = None
    ) -> list[SearchAttributesAttributesDetails]:
        """
        Returns all attribute object of the given event, represented by given event_id.

        :param event_id: of the event
        :type event_id: int
        :param server: the server to get the attribute from, if no server is given, the own API is used
        :type server: Server
        :return: a list of all attributes
        :rtype: list[SearchAttributesAttributesDetails]
        """

        url: str = self.__get_url("/attributes/restSearch", server)
        body: SearchAttributesBody = SearchAttributesBody(
            eventid=event_id, with_attachments=True, include_event_uuid=True
        )
        request: Request = Request("POST", url, json=body.json(by_alias=True))
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)

        try:
            return SearchAttributesResponse.parse_obj(response).response.Attribute
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. Event Attributes could not be parsed: {value_error}")

    async def create_attribute(self: Self, attribute: AddAttributeBody, server: Server | None = None) -> int:
        """
        creates the given attribute on the server

        :param attribute: contains the required attributes to creat an attribute
        :type attribute: AddAttributeBody
        :param server: the server to create the attribute on, if no server is given, the own API is used
        :type server: Server
        :return: The attribute id if the creation was successful. -1 otherwise.
        :rtype: int
        """
        if attribute.uuid is None:
            attribute.uuid = uuid()

        if attribute.deleted is None:
            attribute.deleted = False

        url: str = self.__get_url(f"/attributes/add/{attribute.event_id}", server)

        request: Request = Request("POST", url, data=attribute.json())
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        if "Attribute" in response:
            return int(response["Attribute"]["id"])

        return -1

    async def create_tag(self: Self, tag: TagCreateBody, server: Server | None = None) -> int:
        """
        Creates the given tag on the server
        :param tag: The tag to create.
        :type tag: TagCreateBody
        :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used.
        :type server: Server
        :return: the id of the created tag
        :rtype: int
        """

        url: str = self.__get_url("/tags/add", server)
        request: Request = Request("POST", url, data=tag.json())
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        response: dict = await self.__send_request(prepared_request, server)
        return int(response["Tag"]["id"])

    async def attach_attribute_tag(
        self: Self, attribute_id: int, tag_id: int, local: bool, server: Server | None = None
    ) -> bool:
        """
        Attaches a tag to an attribute

        :param attribute_id: The ID of the attribute.
        :type attribute_id: int
        :param tag_id: The ID of the tag.
        :type tag_id: int
        :param local: If the tag is to be attached only locally.
        :type local: bool
        :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used
        :type server: Server
        :return: true if the attachment was successful
        :rtype: bool
        """

        url: str = self.__get_url(
            f"/attributes/addTag/{attribute_id}/{tag_id}/local:{local}",
            server,
        )
        request: Request = Request("POST", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        await self.__send_request(prepared_request, server)

        return True

    async def attach_event_tag(
        self: Self, event_id: int, tag_id: int, local: bool, server: Server | None = None
    ) -> bool:
        """
        Attaches a tag to an event

        :param event_id: The ID of the event.
        :type event_id: int
        :param tag_id: The ID of the tag.
        :type tag_id: int
        :param local: If the tag is to be attached only locally.
        :type local: bool
        :param server: the server to attach the tag to the event on, if no server is given, the own API is used
        :type server: Server
        :return:
        :rtype: bool
        """

        url: str = self.__get_url(f"/events/addTag/{event_id}/{tag_id}/local:{local}", server)
        request: Request = Request("POST", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        await self.__send_request(prepared_request, server)
        return True

    async def modify_event_tag_relationship(
        self: Self, event_tag_id: int, relationship_type: str, server: Server | None = None
    ) -> bool:
        """
        Modifies the relationship of the given tag to the given event
        Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

        :param event_tag_id: The ID of the event-tag assignment.
        :type event_tag_id: int
        :param relationship_type: The relationship type to set.
        :type relationship_type: str
        :param server: the server to modify the relationship on, if no server is given, the own API is used
        :type server: Server
        :return: returns true if the modification was successful
        :rtype: bool
        """

        url: str = self.__get_url(f"/tags/modifyTagRelationship/event/{event_tag_id}", server)
        body: dict = {"Tag": {"relationship_type": relationship_type}}

        request: Request = Request("POST", url, json=body)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        response: dict = await self.__send_request(prepared_request, server)
        print(response)
        return response["saved"] == "true" and response["success"] == "true"

    async def modify_attribute_tag_relationship(
        self: Self, attribute_tag_id: int, relationship_type: str, server: Server | None = None
    ) -> bool:
        """
        Modifies the relationship of the given tag to the given attribute
        Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

        :param attribute_tag_id: The ID of the attribute-tag assignment.
        :type attribute_tag_id: int
        :param relationship_type: The relationship type to set.
        :type relationship_type: str
        :param server: the server to modify the relationship on, if no server is given, the own API is used
        :type server: Server
        :return: returns true if the modification was successful
        :rtype: bool
        """

        url: str = self.__get_url(f"/tags/modifyTagRelationship/attribute/{attribute_tag_id}", server)
        body = {"Tag": {"relationship_type": relationship_type}}

        request: Request = Request("POST", url, json=body)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        response: dict = await self.__send_request(prepared_request, server)
        print(f"bananenbieger: modify_attribute_tag_relationship: response={response}")
        return response["saved"] is True and response["success"] is True

__get_session(server=None) async

This method is used to get the session for the given server_id if a session for the given server_id already exists, it returns the existing session, otherwise it sets up a new session and returns it.

:param server: server to get the session for, if no server is given, the own API is used :type server: Server :return: returns a session to the specified server :rtype: Session

Source code in src/mmisp/worker/misp_database/misp_api.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def __get_session(self: Self, server: Server | None = None) -> Session:
    """
    This method is used to get the session for the given server_id
    if a session for the given server_id already exists, it returns the existing session,
    otherwise it sets up a new session and returns it.

    :param server: server to get the session for, if no server is given, the own API is used
    :type server: Server
    :return: returns a session to the specified server
    :rtype: Session
    """

    server_id: int = server.id if server is not None else 0
    if server_id == 0:
        return self.__setup_api_session()
    else:
        return await self.__setup_remote_api_session(server_id)

__get_url(path, server=None)

This method is used to get the url for the given server, adding the given path to the url.

if no server is given, it uses the default url from the config, otherwise it uses the url of the given server.

:param path: path to add to the url :type path: str :param server: remote server to get the url for :type server: Server :return: returns the url for the given server with the path added :rtype: str

Source code in src/mmisp/worker/misp_database/misp_api.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def __get_url(self: Self, path: str, server: Server | None = None) -> str:
    """
    This method is used to get the url for the given server, adding the given path to the url.

    if no server is given, it uses the default url from the config,
    otherwise it uses the url of the given server.

    :param path: path to add to the url
    :type path: str
    :param server: remote server to get the url for
    :type server: Server
    :return: returns the url for the given server with the path added
    :rtype: str
    """
    url: str
    if server:
        url = server.url
    else:
        if self.__config.url.endswith("/"):
            url = self.__config.url[:-1]
        else:
            url = self.__config.url

    return self.__join_path(url, path)

__join_path(url, path) staticmethod

This method is used to join the given path to the given url. it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

:param url: url to join the path to :type url: str :param path: path to join to the url :type path: str :return: returns the url with the path added :rtype: str

Source code in src/mmisp/worker/misp_database/misp_api.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@staticmethod
def __join_path(url: str, path: str) -> str:
    """
    This method is used to join the given path to the given url.
    it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

    :param url: url to join the path to
    :type url: str
    :param path: path to join to the url
    :type path: str
    :return: returns the url with the path added
    :rtype: str
    """

    if path.startswith("/"):
        return url + path
    else:
        return f"{url}/{path}"

__send_request(request, server=None, **kwargs) async

This method is used to send the given request and return the response.

:param request: the request to send :type request: PreparedRequest :param kwargs: keyword arguments :type kwargs: dict[str, Any] :return: returns the response of the request :rtype: dict

Source code in src/mmisp/worker/misp_database/misp_api.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def __send_request(self: Self, request: PreparedRequest, server: Server | None = None, **kwargs) -> dict:
    """
    This method is used to send the given request and return the response.

    :param request: the request to send
    :type request: PreparedRequest
    :param kwargs: keyword arguments
    :type kwargs: dict[str, Any]
    :return: returns the response of the request
    :rtype: dict
    """
    print("Request is: ", request)
    print(request.method)
    print(request.url)
    if request.method == "POST":
        print(request.body)
    response: Response

    if "timeout" not in kwargs:
        kwargs["timeout"] = (self.__config.connect_timeout, self.__config.read_timeout)

    try:
        response = (await self.__get_session(server)).send(request, **kwargs)
    except (ConnectionError, TimeoutError, TooManyRedirects) as api_exception:
        _log.warning(f"API not available. The request could not be made. ==> {api_exception}")
        raise APIException(f"API not available. The request could not be made. ==> {api_exception}")

    try:
        response.raise_for_status()
    except requests.HTTPError as http_err:
        # Füge hier eine detaillierte Fehlerausgabe hinzu
        error_details = (
            f"HTTP Error occurred: {http_err}\n"
            f"URL: {request.url}\n"
            f"Status Code: {response.status_code}\n"
            f"Response Text: {response.text}\n"
            f"Headers: {response.headers}"
        )
        _log.error(error_details)
        raise APIException(error_details) from http_err

    if response.status_code != codes.ok:
        raise requests.HTTPError(response, response.text)

    return misp_api_utils.decode_json_response(response)

__setup_api_session()

This method is used to set up the session for the API.

:return: returns the session that was set up :rtype: Session

Source code in src/mmisp/worker/misp_database/misp_api.py
54
55
56
57
58
59
60
61
62
63
64
65
def __setup_api_session(self: Self) -> Session:
    """
    This method is used to set up the session for the API.

    :return:  returns the session that was set up
    :rtype: Session
    """

    session = Session()
    session.headers.update(self.__HEADERS)
    session.headers.update({"Authorization": f"{self.__config.key}"})
    return session

__setup_remote_api_session(server_id) async

This method is used to set up the session for the remote API.

:param server_id: server id of the remote server to set up the session for :type server_id: int :return: returns the session to the specified server that was set up :rtype: Session

Source code in src/mmisp/worker/misp_database/misp_api.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
async def __setup_remote_api_session(self: Self, server_id: int) -> Session:
    """
    This method is used to set up the session for the remote API.

    :param server_id: server id of the remote server to set up the session for
    :type server_id: int
    :return: returns the session to the specified server that was set up
    :rtype: Session
    """

    key: str | None = await get_api_authkey(self._db, server_id)
    if key is None:
        raise APIException(f"API key for server {server_id} is not available.")

    session = Session()
    session.headers.update(self.__HEADERS)
    session.headers.update({"Authorization": f"{key}"})
    return session

attach_attribute_tag(attribute_id, tag_id, local, server=None) async

Attaches a tag to an attribute

:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used :type server: Server :return: true if the attachment was successful :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
async def attach_attribute_tag(
    self: Self, attribute_id: int, tag_id: int, local: bool, server: Server | None = None
) -> bool:
    """
    Attaches a tag to an attribute

    :param attribute_id: The ID of the attribute.
    :type attribute_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :param local: If the tag is to be attached only locally.
    :type local: bool
    :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used
    :type server: Server
    :return: true if the attachment was successful
    :rtype: bool
    """

    url: str = self.__get_url(
        f"/attributes/addTag/{attribute_id}/{tag_id}/local:{local}",
        server,
    )
    request: Request = Request("POST", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    await self.__send_request(prepared_request, server)

    return True

attach_event_tag(event_id, tag_id, local, server=None) async

Attaches a tag to an event

:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the event on, if no server is given, the own API is used :type server: Server :return: :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
async def attach_event_tag(
    self: Self, event_id: int, tag_id: int, local: bool, server: Server | None = None
) -> bool:
    """
    Attaches a tag to an event

    :param event_id: The ID of the event.
    :type event_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :param local: If the tag is to be attached only locally.
    :type local: bool
    :param server: the server to attach the tag to the event on, if no server is given, the own API is used
    :type server: Server
    :return:
    :rtype: bool
    """

    url: str = self.__get_url(f"/events/addTag/{event_id}/{tag_id}/local:{local}", server)
    request: Request = Request("POST", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    await self.__send_request(prepared_request, server)
    return True

create_attribute(attribute, server=None) async

creates the given attribute on the server

:param attribute: contains the required attributes to creat an attribute :type attribute: AddAttributeBody :param server: the server to create the attribute on, if no server is given, the own API is used :type server: Server :return: The attribute id if the creation was successful. -1 otherwise. :rtype: int

Source code in src/mmisp/worker/misp_database/misp_api.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
async def create_attribute(self: Self, attribute: AddAttributeBody, server: Server | None = None) -> int:
    """
    creates the given attribute on the server

    :param attribute: contains the required attributes to creat an attribute
    :type attribute: AddAttributeBody
    :param server: the server to create the attribute on, if no server is given, the own API is used
    :type server: Server
    :return: The attribute id if the creation was successful. -1 otherwise.
    :rtype: int
    """
    if attribute.uuid is None:
        attribute.uuid = uuid()

    if attribute.deleted is None:
        attribute.deleted = False

    url: str = self.__get_url(f"/attributes/add/{attribute.event_id}", server)

    request: Request = Request("POST", url, data=attribute.json())
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    if "Attribute" in response:
        return int(response["Attribute"]["id"])

    return -1

create_tag(tag, server=None) async

Creates the given tag on the server :param tag: The tag to create. :type tag: TagCreateBody :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used. :type server: Server :return: the id of the created tag :rtype: int

Source code in src/mmisp/worker/misp_database/misp_api.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
async def create_tag(self: Self, tag: TagCreateBody, server: Server | None = None) -> int:
    """
    Creates the given tag on the server
    :param tag: The tag to create.
    :type tag: TagCreateBody
    :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used.
    :type server: Server
    :return: the id of the created tag
    :rtype: int
    """

    url: str = self.__get_url("/tags/add", server)
    request: Request = Request("POST", url, data=tag.json())
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    response: dict = await self.__send_request(prepared_request, server)
    return int(response["Tag"]["id"])

get_attribute(attribute_id, server=None) async

Returns the attribute with the given attribute_id.

:param attribute_id: the id of the attribute to get :type attribute_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: returns the attribute with the given attribute_id :rtype: GetAttributeAttributes

Source code in src/mmisp/worker/misp_database/misp_api.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
async def get_attribute(self: Self, attribute_id: int, server: Server | None = None) -> GetAttributeAttributes:
    """
    Returns the attribute with the given attribute_id.

    :param attribute_id: the id of the attribute to get
    :type attribute_id: int
    :param server: the server to get the attribute from, if no server is given, the own API is used
    :type server: Server
    :return: returns the attribute with the given attribute_id
    :rtype: GetAttributeAttributes
    """

    url: str = self.__get_url(f"/attributes/{attribute_id}", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)

    try:
        return GetAttributeResponse.parse_obj(response).Attribute
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. MISP Attribute could not be parsed: {value_error}")

get_event(event_id, server=None) async

Returns the event with the given event_id from the given server, the own API is used if no server is given.

:param event_id: the id of the event to get :type event_id: int :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the event with the given event_id from the given server :rtype: AddEditGetEventDetails

Source code in src/mmisp/worker/misp_database/misp_api.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def get_event(self: Self, event_id: int | UUID, server: Server | None = None) -> AddEditGetEventDetails:
    """
    Returns the event with the given event_id from the given server,
     the own API is used if no server is given.

    :param event_id: the id of the event to get
    :type event_id: int
    :param server: the server to get the event from, if no server is given, the own API is used
    :type server: Server
    :return: returns the event with the given event_id from the given server
    :rtype: AddEditGetEventDetails
    """
    url: str = self.__get_url(f"/events/view/{event_id}", server)
    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    try:
        return AddEditGetEventDetails.parse_obj(response["Event"])
    except ValueError as value_error:
        raise InvalidAPIResponse(
            f"Invalid API response. AddEditGetEventDetails"
            f"{json.dumps(response['Event'])} could not be parsed: {value_error}"
        )

get_event_attributes(event_id, server=None) async

Returns all attribute object of the given event, represented by given event_id.

:param event_id: of the event :type event_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: a list of all attributes :rtype: list[SearchAttributesAttributesDetails]

Source code in src/mmisp/worker/misp_database/misp_api.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
async def get_event_attributes(
    self: Self, event_id: int, server: Server | None = None
) -> list[SearchAttributesAttributesDetails]:
    """
    Returns all attribute object of the given event, represented by given event_id.

    :param event_id: of the event
    :type event_id: int
    :param server: the server to get the attribute from, if no server is given, the own API is used
    :type server: Server
    :return: a list of all attributes
    :rtype: list[SearchAttributesAttributesDetails]
    """

    url: str = self.__get_url("/attributes/restSearch", server)
    body: SearchAttributesBody = SearchAttributesBody(
        eventid=event_id, with_attachments=True, include_event_uuid=True
    )
    request: Request = Request("POST", url, json=body.json(by_alias=True))
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)

    try:
        return SearchAttributesResponse.parse_obj(response).response.Attribute
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. Event Attributes could not be parsed: {value_error}")

get_object(object_id, server=None) async

Returns the object with the given object_id.

:param object_id: id of the object :type object_id: int :param server: the server to get the object from, if no server is given, the own API is used :type server: Server :return: The object :rtype: ObjectWithAttributesResponse

Source code in src/mmisp/worker/misp_database/misp_api.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def get_object(self: Self, object_id: int, server: Server | None = None) -> ObjectWithAttributesResponse:
    """
    Returns the object with the given object_id.

    :param object_id:  id of the object
    :type object_id: int
    :param server: the server to get the object from, if no server is given, the own API is used
    :type server: Server
    :return: The object
    :rtype: ObjectWithAttributesResponse
    """
    if object_id == 0:
        #  for correlation to give back an empty object
        return ObjectWithAttributesResponse(id=0, uuid="", name="", distribution=4, sharing_group_id=0)

    url: str = self.__get_url(f"objects/view/{object_id}", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)

    try:
        return ObjectResponse.parse_obj(response).Object
    except ValueError as value_error:
        raise InvalidAPIResponse(
            f"Invalid API response. MISP ObjectWithAttributesResponse could not be parsed: {value_error}"
        )

get_sharing_group(sharing_group_id, server=None) async

Returns the sharing group with the given sharing_group_id

:param sharing_group_id: id of the sharing group to get from the API :type sharing_group_id: int :param server: the server to get the sharing group from, if no server is given, the own API is used :type server: Server :return: returns the sharing group that got requested :rtype: ViewUpdateSharingGroupLegacyResponse

Source code in src/mmisp/worker/misp_database/misp_api.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def get_sharing_group(
    self: Self, sharing_group_id: int, server: Server | None = None
) -> ViewUpdateSharingGroupLegacyResponse:
    """
    Returns the sharing group with the given sharing_group_id

    :param sharing_group_id: id of the sharing group to get from the API
    :type sharing_group_id: int
    :param server: the server to get the sharing group from, if no server is given, the own API is used
    :type server: Server
    :return: returns the sharing group that got requested
    :rtype: ViewUpdateSharingGroupLegacyResponse
    """

    url: str = self.__get_url(f"/sharing_groups/view/{sharing_group_id}", server)
    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    try:
        return ViewUpdateSharingGroupLegacyResponse.parse_obj(response)
    except ValueError as value_error:
        raise InvalidAPIResponse(
            f"Invalid API response. MISP ViewUpdateSharingGroupLegacyResponse could not be parsed: {value_error}"
        )

get_sharing_groups(server=None) async

Returns all sharing groups from the given server, if no server is given, the own API is used.

:param server: the server to get the sharing groups from, if no server is given, the own API is used :type server: Server :return: returns all sharing groups from the given server :rtype: list[GetAllSharingGroupsResponseResponseItem]

Source code in src/mmisp/worker/misp_database/misp_api.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def get_sharing_groups(
    self: Self, server: Server | None = None
) -> list[GetAllSharingGroupsResponseResponseItem]:
    """
    Returns all sharing groups from the given server, if no server is given, the own API is used.

    :param server: the server to get the sharing groups from, if no server is given, the own API is used
    :type server: Server
    :return: returns all sharing groups from the given server
    :rtype: list[GetAllSharingGroupsResponseResponseItem]
    """
    url: str = self.__get_url("/sharing_groups", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    print(f"get_sharing_groups: response={response}")

    try:
        return GetAllSharingGroupsResponse.parse_obj(response).response
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. MISP Sharing Group could not be parsed: {value_error}")

get_user(user_id, server=None) async

Returns the user with the given user_id.

:param user_id: id of the user :type user_id: int :param server: the server to get the user from, if no server is given, the own API is used :type server: Server :return: returns the user with the given user_id :rtype: MispUser

Source code in src/mmisp/worker/misp_database/misp_api.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
async def get_user(self: Self, user_id: int, server: Server | None = None) -> MispUser:
    """
    Returns the user with the given user_id.

    :param user_id: id of the user
    :type user_id: int
    :param server: the server to get the user from, if no server is given, the own API is used
    :type server: Server
    :return: returns the user with the given user_id
    :rtype: MispUser
    """
    url: str = self.__get_url(f"/admin/users/view/{user_id}", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    get_user_element_responds: GetUsersElement = GetUsersElement.parse_obj(response)
    user_dict: dict = get_user_element_responds.User.dict()
    user_dict["role"] = get_user_element_responds.Role.dict()

    try:
        return MispUser.parse_obj(user_dict)
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. MISP user could not be parsed: {value_error}")

modify_attribute_tag_relationship(attribute_tag_id, relationship_type, server=None) async

Modifies the relationship of the given tag to the given attribute Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

:param attribute_tag_id: The ID of the attribute-tag assignment. :type attribute_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
async def modify_attribute_tag_relationship(
    self: Self, attribute_tag_id: int, relationship_type: str, server: Server | None = None
) -> bool:
    """
    Modifies the relationship of the given tag to the given attribute
    Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

    :param attribute_tag_id: The ID of the attribute-tag assignment.
    :type attribute_tag_id: int
    :param relationship_type: The relationship type to set.
    :type relationship_type: str
    :param server: the server to modify the relationship on, if no server is given, the own API is used
    :type server: Server
    :return: returns true if the modification was successful
    :rtype: bool
    """

    url: str = self.__get_url(f"/tags/modifyTagRelationship/attribute/{attribute_tag_id}", server)
    body = {"Tag": {"relationship_type": relationship_type}}

    request: Request = Request("POST", url, json=body)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    response: dict = await self.__send_request(prepared_request, server)
    print(f"bananenbieger: modify_attribute_tag_relationship: response={response}")
    return response["saved"] is True and response["success"] is True

modify_event_tag_relationship(event_tag_id, relationship_type, server=None) async

Modifies the relationship of the given tag to the given event Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

:param event_tag_id: The ID of the event-tag assignment. :type event_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
async def modify_event_tag_relationship(
    self: Self, event_tag_id: int, relationship_type: str, server: Server | None = None
) -> bool:
    """
    Modifies the relationship of the given tag to the given event
    Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

    :param event_tag_id: The ID of the event-tag assignment.
    :type event_tag_id: int
    :param relationship_type: The relationship type to set.
    :type relationship_type: str
    :param server: the server to modify the relationship on, if no server is given, the own API is used
    :type server: Server
    :return: returns true if the modification was successful
    :rtype: bool
    """

    url: str = self.__get_url(f"/tags/modifyTagRelationship/event/{event_tag_id}", server)
    body: dict = {"Tag": {"relationship_type": relationship_type}}

    request: Request = Request("POST", url, json=body)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    response: dict = await self.__send_request(prepared_request, server)
    print(response)
    return response["saved"] == "true" and response["success"] == "true"

MMispRedis

Encapsulates the connection to the MMISP Redis database.

Source code in src/mmisp/worker/misp_database/mmisp_redis.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MMispRedis:
    """
    Encapsulates the connection to the MMISP Redis database.
    """

    def __init__(self: Self, config: MMispRedisConfigData = mmisp_redis_config_data) -> None:
        self._config: MMispRedisConfigData = config
        self._redis_connection = redis.Redis(
            host=self._config.host,
            port=self._config.port,
            db=self._config.db,
            username=self._config.username,
            password=self._config.password,
            decode_responses=True,
        )

    async def get_enqueued_celery_tasks(self: Self, queue: str) -> int:
        """
        Returns the number of enqueued celery tasks in the given queue.
        :param queue: The queue name.
        :type queue: str
        """
        llen = self._redis_connection.llen(queue)
        if isawaitable(llen):
            return await llen
        else:
            assert isinstance(llen, int)
            return llen

get_enqueued_celery_tasks(queue) async

Returns the number of enqueued celery tasks in the given queue. :param queue: The queue name. :type queue: str

Source code in src/mmisp/worker/misp_database/mmisp_redis.py
25
26
27
28
29
30
31
32
33
34
35
36
async def get_enqueued_celery_tasks(self: Self, queue: str) -> int:
    """
    Returns the number of enqueued celery tasks in the given queue.
    :param queue: The queue name.
    :type queue: str
    """
    llen = self._redis_connection.llen(queue)
    if isawaitable(llen):
        return await llen
    else:
        assert isinstance(llen, int)
        return llen

helper module to interact with misp database

add_correlation_value(session, value) async

Adds a new value to correlation_values table or returns the id of the current entry with the same value. :param value: to add or get id of in the correlation_values table :type value: str :return: the id of the value in the correlation_values table :rtype: int

Source code in src/mmisp/worker/misp_database/misp_sql.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def add_correlation_value(session: AsyncSession, value: str) -> int:
    """
    Adds a new value to correlation_values table or returns the id of the current entry with the same value.
    :param value: to add or get id of in the correlation_values table
    :type value: str
    :return: the id of the value in the correlation_values table
    :rtype: int
    """
    statement = select(CorrelationValue).where(CorrelationValue.value == value)
    result: CorrelationValue | None = (await session.execute(statement)).scalars().first()
    if not result:
        new_value: CorrelationValue = CorrelationValue(value=value)
        session.add(new_value)
        await session.commit()
        await session.refresh(new_value)
        return new_value.id
    else:
        return result.id

add_correlations(session, correlations) async

Adds a list of correlations to the database. Returns True if at least one correlation was added, False otherwise. Doesn't add correlations that are already in the database. :param correlations: list of correlations to add :type correlations: list[DefaultCorrelation] :return: true if at least one correlation was added, false otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def add_correlations(session: AsyncSession, correlations: list[DefaultCorrelation]) -> bool:
    """
    Adds a list of correlations to the database. Returns True if at least one correlation was added,
    False otherwise.
    Doesn't add correlations that are already in the database.
    :param correlations: list of correlations to add
    :type correlations: list[DefaultCorrelation]
    :return: true if at least one correlation was added, false otherwise
    :rtype: bool
    """
    changed: bool = False
    for correlation in correlations:
        attribute_id1 = correlation.attribute_id
        attribute_id2 = correlation.attribute_id_1
        search_statement_1 = select(DefaultCorrelation.id).where(
            and_(
                DefaultCorrelation.attribute_id == attribute_id1,
                DefaultCorrelation.attribute_id_1 == attribute_id2,
            )
        )
        search_statement_2 = select(DefaultCorrelation.id).where(
            and_(
                DefaultCorrelation.attribute_id == attribute_id2,
                DefaultCorrelation.attribute_id_1 == attribute_id1,
            )
        )
        search_result_1: int | None = (await session.execute(search_statement_1)).scalars().first()
        search_result_2: int | None = (await session.execute(search_statement_2)).scalars().first()

        if search_result_1 or search_result_2:
            continue
        session.add(correlation)
        changed = True
    if changed:
        await session.commit()
    return changed

add_over_correlating_value(session, value, count) async

Adds a new value to over_correlating_values table or updates the current entry with the same value. Returns True if value was added or updated, False otherwise. :param value: add or update :type value: str :param count: occurrence of value :type count: int :return: True if value was added or updated, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def add_over_correlating_value(session: AsyncSession, value: str, count: int) -> bool:
    """
    Adds a new value to over_correlating_values table or updates the current entry with the same value.
    Returns True if value was added or updated, False otherwise.
    :param value: add or update
    :type value: str
    :param count: occurrence of value
    :type count: int
    :return: True if value was added or updated, False otherwise
    :rtype: bool
    """
    statement = select(OverCorrelatingValue).where(OverCorrelatingValue.value == value)
    result: OverCorrelatingValue | None = (await session.execute(statement)).scalars().first()
    if result is not None:
        result.occurrence = count
        session.add(result)
    else:
        ocv: OverCorrelatingValue = OverCorrelatingValue(value=value, occurrence=count)
        session.add(ocv)

    await session.commit()
    return True

delete_correlations(session, value) async

Deletes all correlations with value from database. Returns True if value was in database, False otherwise. :param value: to delete the correlations of :type value: str :return: True if value was in database, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def delete_correlations(session: AsyncSession, value: str) -> bool:
    """
    Deletes all correlations with value from database. Returns True if value was in database, False otherwise.
    :param value: to delete the correlations of
    :type value: str
    :return: True if value was in database, False otherwise
    :rtype: bool
    """
    statement_value_id = select(CorrelationValue).where(CorrelationValue.value == value)
    correlation_value: CorrelationValue | None = (await session.execute(statement_value_id)).scalars().first()

    if correlation_value:
        delete_statement_value = delete(CorrelationValue).where(CorrelationValue.value == value)
        await session.execute(delete_statement_value)

        delete_statement_correlations = delete(DefaultCorrelation).where(
            DefaultCorrelation.value_id == correlation_value.id
        )
        await session.execute(delete_statement_correlations)

        return True
    else:
        return False

delete_over_correlating_value(session, value) async

Deletes value from over_correlating_values table. Returns True if value was in table, False otherwise. :param value: row to delete :type value: str :return: true if value was in table, false otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
async def delete_over_correlating_value(session: AsyncSession, value: str) -> bool:
    """
    Deletes value from over_correlating_values table. Returns True if value was in table, False otherwise.
    :param value: row to delete
    :type value: str
    :return: true if value was in table, false otherwise
    :rtype: bool
    """
    result = await is_over_correlating_value(session, value)
    if result:
        statement = delete(OverCorrelatingValue).where(OverCorrelatingValue.value == value)
        await session.execute(statement)
        return True
    return False

filter_blocked_clusters(session, clusters) async

Get all blocked clusters from database and remove them from clusters list. :param clusters: list of clusters to check :type clusters: list[GetGalaxyClusterResponse] :return: list without blocked clusters :rtype: list[MispGalaxyCluster]

Source code in src/mmisp/worker/misp_database/misp_sql.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
async def filter_blocked_clusters(
    session: AsyncSession, clusters: list[GetGalaxyClusterResponse]
) -> list[GetGalaxyClusterResponse]:
    """
    Get all blocked clusters from database and remove them from clusters list.
    :param clusters: list of clusters to check
    :type clusters: list[GetGalaxyClusterResponse]
    :return: list without blocked clusters
    :rtype: list[MispGalaxyCluster]
    """
    for cluster in clusters:
        statement = select(GalaxyClusterBlocklist).where(GalaxyClusterBlocklist.cluster_uuid == cluster.uuid)
        result = (await session.execute(statement)).scalars().all()
        if len(result) > 0:
            clusters.remove(cluster)
    return clusters

filter_blocked_events(session, events, use_event_blocklist, use_org_blocklist) async

Clear the list from events that are listed as blocked in the misp database. Also, if the org is blocked, the events in the org are removed from the list. Return the list without the blocked events. :param events: list to remove blocked events from :type events: list[AddEditGetEventDetails] :param use_event_blocklist: if True, blocked events are removed from the list :type use_event_blocklist: bool :param use_org_blocklist: if True, the events from blocked orgs are removed from the list :type use_org_blocklist: bool :return: the list without the blocked events :rtype: list[MispEvent]

Source code in src/mmisp/worker/misp_database/misp_sql.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def filter_blocked_events(
    session: AsyncSession, events: list[MispMinimalEvent], use_event_blocklist: bool, use_org_blocklist: bool
) -> list[MispMinimalEvent]:
    """
    Clear the list from events that are listed as blocked in the misp database. Also, if the org is blocked, the
    events in the org are removed from the list. Return the list without the blocked events.
    :param events: list to remove blocked events from
    :type events: list[AddEditGetEventDetails]
    :param use_event_blocklist: if True, blocked events are removed from the list
    :type use_event_blocklist: bool
    :param use_org_blocklist: if True, the events from blocked orgs are removed from the list
    :type use_org_blocklist: bool
    :return: the list without the blocked events
    :rtype: list[MispEvent]
    """
    if use_org_blocklist:
        for event in events:
            statement = select(EventBlocklist).where(OrgBlocklist.org_uuid == event.org_c_uuid)
            result = (await session.execute(statement)).scalars().all()
            if len(result) > 0:
                events.remove(event)
    if use_event_blocklist:
        for event in events:
            statement = select(EventBlocklist).where(EventBlocklist.event_uuid == event.uuid)
            result = (await session.execute(statement)).scalars().all()
            if len(result) > 0:
                events.remove(event)
    return events

get_api_authkey(session, server_id) async

Method to get the API authentication key of the server with the given ID. :param server_id: The ID of the server. :type server_id: int :return: The API authentication key of the server. :rtype: str

Source code in src/mmisp/worker/misp_database/misp_sql.py
26
27
28
29
30
31
32
33
34
35
36
async def get_api_authkey(session: AsyncSession, server_id: int) -> str | None:
    """
    Method to get the API authentication key of the server with the given ID.
    :param server_id: The ID of the server.
    :type server_id: int
    :return: The API authentication key of the server.
    :rtype: str
    """
    statement = select(Server.authkey).where(Server.id == server_id)
    result: str | None = (await session.execute(statement)).scalars().first()
    return result

get_attribute_tag(session, attribute_tag_id) async

Method to get the AttributeTag object with the given ID.

:param attribute_tag_id: The ID of the attribute-tag object. :type attribute_tag_id: int :return: The AttributeTag object or None if it doesn't exist. :rtype: AttributeTag | None

Source code in src/mmisp/worker/misp_database/misp_sql.py
380
381
382
383
384
385
386
387
388
389
390
391
async def get_attribute_tag(session: AsyncSession, attribute_tag_id: int) -> AttributeTag | None:
    """
    Method to get the AttributeTag object with the given ID.

    :param attribute_tag_id: The ID of the attribute-tag object.
    :type attribute_tag_id: int
    :return: The AttributeTag object or None if it doesn't exist.
    :rtype: AttributeTag | None
    """

    statement = select(AttributeTag).where(AttributeTag.id == attribute_tag_id)
    return (await session.execute(statement)).scalars().first()

get_attribute_tag_id(session, attribute_id, tag_id) async

Method to get the ID of the attribute-tag object associated with the given attribute-ID and tag-ID.

:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the attribute-tag object or -1 if the object does not exist. :rtype: int

Source code in src/mmisp/worker/misp_database/misp_sql.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
async def get_attribute_tag_id(session: AsyncSession, attribute_id: int, tag_id: int) -> int:
    """
    Method to get the ID of the attribute-tag object associated with the given attribute-ID and tag-ID.

    :param attribute_id: The ID of the attribute.
    :type attribute_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :return: The ID of the attribute-tag object or -1 if the object does not exist.
    :rtype: int
    """

    statement = select(AttributeTag.id).where(
        and_(AttributeTag.attribute_id == attribute_id, AttributeTag.tag_id == tag_id)
    )
    search_result: int | None = (await session.execute(statement)).scalar()
    if search_result:
        return search_result
    else:
        return -1

get_attributes_with_same_value(session, value) async

Method to get all attributes with the same value from database. :param value: to get attributes with :type value: str :return: list of attributes with the same value :rtype: list[Attribute]

Source code in src/mmisp/worker/misp_database/misp_sql.py
87
88
89
90
91
92
93
94
95
96
97
async def get_attributes_with_same_value(session: AsyncSession, value: str) -> list[Attribute]:
    """
    Method to get all attributes with the same value from database.
    :param value: to get attributes with
    :type value: str
    :return: list of attributes with the same value
    :rtype: list[Attribute]
    """
    statement = select(Attribute).where(and_(Attribute.value == value, Attribute.disable_correlation == false()))  # type: ignore
    result: list[Attribute] = list((await session.execute(statement)).scalars().all())
    return result

get_event_tag_id(session, event_id, tag_id) async

Method to get the ID of the event-tag object associated with the given event-ID and tag-ID.

:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the event-tag object or -1 if the object does not exist. :rtype: int

Source code in src/mmisp/worker/misp_database/misp_sql.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
async def get_event_tag_id(session: AsyncSession, event_id: int, tag_id: int) -> int:
    """
    Method to get the ID of the event-tag object associated with the given event-ID and tag-ID.

    :param event_id: The ID of the event.
    :type event_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :return: The ID of the event-tag object or -1 if the object does not exist.
    :rtype: int
    """

    statement = select(EventTag.id).where(and_(EventTag.event_id == event_id, EventTag.tag_id == tag_id))
    search_result: int | None = (await session.execute(statement)).scalar()
    if search_result:
        return search_result
    else:
        return -1

get_excluded_correlations(session) async

Method to get all values from correlation_exclusions table. :return: all values from correlation_exclusions table :rtype: list[str]

Source code in src/mmisp/worker/misp_database/misp_sql.py
121
122
123
124
125
126
127
128
async def get_excluded_correlations(session: AsyncSession) -> Sequence[str]:
    """
    Method to get all values from correlation_exclusions table.
    :return: all values from correlation_exclusions table
    :rtype: list[str]
    """
    statement = select(CorrelationExclusions.value)
    return (await session.execute(statement)).scalars().all()

get_number_of_correlations(session, value, only_over_correlating_table) async

Returns the number of correlations of value in the database. If only_over_correlating_table is True, only the value in the over_correlating_values table is returned. Else the number of correlations in the default_correlations table is returned Attention: It is assumed that the value is in the over_correlating_values table if only_over_correlating_table is True. :param value: to get number of correlations of :type value: str :param only_over_correlating_table: if True, only the value in the over_correlating_values table is returned :type only_over_correlating_table: bool :return: number of correlations of value in the database

Source code in src/mmisp/worker/misp_database/misp_sql.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
async def get_number_of_correlations(session: AsyncSession, value: str, only_over_correlating_table: bool) -> int:
    """
    Returns the number of correlations of value in the database. If only_over_correlating_table is True, only the
    value in the over_correlating_values table is returned. Else the number of  correlations in the
    default_correlations table is returned
    Attention: It is assumed that the value is in the over_correlating_values table if only_over_correlating_table
     is True.
    :param value: to get number of correlations of
    :type value: str
    :param only_over_correlating_table: if True, only the value in the over_correlating_values table is returned
    :type only_over_correlating_table: bool
    :return: number of correlations of value in the database
    """
    if only_over_correlating_table:
        statement = select(OverCorrelatingValue.occurrence).where(OverCorrelatingValue.value == value)
        result: int | None = (await session.execute(statement)).scalars().first()
        if result:
            return result
        raise ValueError(f"Value {value} not in over_correlating_values table")
    else:
        search_statement = select(CorrelationValue.id).where(CorrelationValue.value == value)
        value_id: int | None = (await session.execute(search_statement)).scalars().first()
        if value_id:
            statement = select(DefaultCorrelation.id).where(DefaultCorrelation.value_id == value_id)
            all_elements: Sequence = (await session.execute(statement)).scalars().all()
            return len(all_elements)
        else:
            return 0

get_over_correlating_values(session) async

Method to get all values from over_correlating_values table with their occurrence. :return: all values from over_correlating_values table with their occurrence :rtype: list[tuple[str, int]]

Source code in src/mmisp/worker/misp_database/misp_sql.py
111
112
113
114
115
116
117
118
async def get_over_correlating_values(session: AsyncSession) -> list[tuple[str, int]]:
    """
    Method to get all values from over_correlating_values table with their occurrence.
    :return: all values from over_correlating_values table with their occurrence
    :rtype: list[tuple[str, int]]
    """
    statement = select(OverCorrelatingValue.value, OverCorrelatingValue.occurrence)
    return cast(list[tuple[str, int]], (await session.execute(statement)).all())

get_post(session, post_id) async

Method to get a post from database. :param post_id: the id of the post to get :type post_id: int :return: the post with the given id :rtype: MispPost

Source code in src/mmisp/worker/misp_database/misp_sql.py
139
140
141
142
143
144
145
146
147
148
149
150
151
async def get_post(session: AsyncSession, post_id: int) -> Post:
    """
    Method to get a post from database.
    :param post_id: the id of the post to get
    :type post_id: int
    :return: the post with the given id
    :rtype: MispPost
    """
    statement = select(Post).where(Post.id == post_id)
    result: Post | None = (await session.execute(statement)).scalars().first()
    if result:
        return result
    raise ValueError(f"Post with ID {post_id} doesn't exist.")

get_values_with_correlation(session) async

" Method to get all values from correlation_values table. :return: all values from correlation_values table :rtype: list[str]

Source code in src/mmisp/worker/misp_database/misp_sql.py
100
101
102
103
104
105
106
107
108
async def get_values_with_correlation(session: AsyncSession) -> list[str]:
    """ "
    Method to get all values from correlation_values table.
    :return: all values from correlation_values table
    :rtype: list[str]
    """
    statement = select(CorrelationValue.value)
    result: Sequence = (await session.execute(statement)).scalars().all()
    return list(result)

is_excluded_correlation(session, value) async

Checks if value is in correlation_exclusions table. :param value: to check :type value: str :return: True if value is in correlation_exclusions table, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
154
155
156
157
158
159
160
161
162
163
164
165
166
async def is_excluded_correlation(session: AsyncSession, value: str) -> bool:
    """
    Checks if value is in correlation_exclusions table.
    :param value: to check
    :type value: str
    :return: True if value is in correlation_exclusions table, False otherwise
    :rtype: bool
    """
    statement = select(CorrelationExclusions.id).where(CorrelationExclusions.value == value)
    result = (await session.execute(statement)).first()
    if result:
        return True
    return False

is_over_correlating_value(session, value) async

Checks if value is in over_correlating_values table. Doesn't check if value has more correlations in the database than the current threshold. :param value: to check :type value: str :return: True if value is in over_correlating_values table, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
async def is_over_correlating_value(session: AsyncSession, value: str) -> bool:
    """
    Checks if value is in over_correlating_values table. Doesn't check if value has more correlations in the
    database than the current threshold.
    :param value: to check
    :type value: str
    :return: True if value is in over_correlating_values table, False otherwise
    :rtype: bool
    """
    statement = select(OverCorrelatingValue).where(OverCorrelatingValue.value == value)
    result: OverCorrelatingValue | None = (await session.execute(statement)).scalars().first()
    if result:
        return True
    return False

MispAPIConfigData

Bases: ConfigData

Source code in src/mmisp/worker/misp_database/misp_api_config.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class MispAPIConfigData(ConfigData):
    class Config:
        """
        Pydantic configuration.
        """

        validate_assignment: bool = True

    url: Annotated[str, constr(regex="^https?://\\w")] = "http://127.0.0.1"
    key: str = ""
    connect_timeout: NonNegativeFloat = 40
    read_timeout: NonNegativeFloat = 40

    def __init__(self: Self) -> None:
        super().__init__()
        self.read_from_env()

    def read_from_env(self: Self) -> None:
        """
        Read the environment variables and set the values to the class attributes that are used by the MISP API.
        """

        env_dict: dict = {
            "url": os.environ.get(ENV_MISP_API_URL),
            "key": os.environ.get(ENV_MISP_API_KEY),
            "connect_timeout": os.environ.get(ENV_MISP_API_CONNECT_TIMEOUT),
            "read_timeout": os.environ.get(ENV_MISP_API_READ_TIMEOUT),
        }

        for env in env_dict:
            value: str = env_dict[env]
            if value:
                try:
                    setattr(self, env, value)
                except ValidationError as validation_error:
                    _log.warning(f"Could not set {env} to {value}. Error: {validation_error}")

Config

Pydantic configuration.

Source code in src/mmisp/worker/misp_database/misp_api_config.py
19
20
21
22
23
24
class Config:
    """
    Pydantic configuration.
    """

    validate_assignment: bool = True

read_from_env()

Read the environment variables and set the values to the class attributes that are used by the MISP API.

Source code in src/mmisp/worker/misp_database/misp_api_config.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def read_from_env(self: Self) -> None:
    """
    Read the environment variables and set the values to the class attributes that are used by the MISP API.
    """

    env_dict: dict = {
        "url": os.environ.get(ENV_MISP_API_URL),
        "key": os.environ.get(ENV_MISP_API_KEY),
        "connect_timeout": os.environ.get(ENV_MISP_API_CONNECT_TIMEOUT),
        "read_timeout": os.environ.get(ENV_MISP_API_READ_TIMEOUT),
    }

    for env in env_dict:
        value: str = env_dict[env]
        if value:
            try:
                setattr(self, env, value)
            except ValidationError as validation_error:
                _log.warning(f"Could not set {env} to {value}. Error: {validation_error}")

Encapsulates the response data for the jobs router.

CreateJobResponse

Bases: BaseModel

Encapsulates the response for a create jobs API call

Source code in src/mmisp/worker/api/response_schemas.py
42
43
44
45
46
47
48
49
50
class CreateJobResponse(BaseModel):
    """
    Encapsulates the response for a create jobs API call
    """

    success: bool
    """The API call was successful or not"""
    job_id: str | None
    """The id of the created job"""

job_id: str | None instance-attribute

The id of the created job

success: bool instance-attribute

The API call was successful or not

DeleteJobResponse

Bases: BaseModel

Encapsulates the response for a remove jobs API call

Source code in src/mmisp/worker/api/response_schemas.py
53
54
55
56
57
58
59
class DeleteJobResponse(BaseModel):
    """
    Encapsulates the response for a remove jobs API call
    """

    success: bool
    """The API call was successful or not"""

success: bool instance-attribute

The API call was successful or not

ExceptionResponse

Bases: BaseModel

Encapsulates the response for a jobs where an exception was raised

Source code in src/mmisp/worker/api/response_schemas.py
33
34
35
36
37
38
39
class ExceptionResponse(BaseModel):
    """
    Encapsulates the response for a jobs where an exception was raised
    """

    message: str
    """A costume message which describes the error that occurred"""

message: str instance-attribute

A costume message which describes the error that occurred

JobStatusEnum

Bases: str, Enum

Encapsulates the status of a Job

Source code in src/mmisp/worker/api/response_schemas.py
10
11
12
13
14
15
16
17
18
19
class JobStatusEnum(str, Enum):
    """
    Encapsulates the status of a Job
    """

    SUCCESS = "success"
    FAILED = "failed"
    IN_PROGRESS = "inProgress"
    QUEUED = "queued"
    REVOKED = "revoked"

JobStatusResponse

Bases: BaseModel

Encapsulates the response for a jobs status API call

Source code in src/mmisp/worker/api/response_schemas.py
22
23
24
25
26
27
28
29
30
class JobStatusResponse(BaseModel):
    """
    Encapsulates the response for a jobs status API call
    """

    status: JobStatusEnum
    """The status of the job"""
    message: str
    """A costume message which describes the success of getting the job status"""

message: str instance-attribute

A costume message which describes the success of getting the job status

status: JobStatusEnum instance-attribute

The status of the job

StartStopWorkerResponse

Bases: BaseModel

Represents the API response of starting and stopping a worker

Source code in src/mmisp/worker/api/response_schemas.py
77
78
79
80
81
82
83
84
85
86
87
class StartStopWorkerResponse(BaseModel):
    """
    Represents the API response of starting and stopping a worker
    """

    success: bool
    """The API call was successful or not"""
    message: str
    """A costume message which describes the success of starting or stopping the worker"""
    url: str
    """The API url"""

message: str instance-attribute

A costume message which describes the success of starting or stopping the worker

success: bool instance-attribute

The API call was successful or not

url: str instance-attribute

The API url

WorkerStatusEnum

Bases: str, Enum

Represents different statuses of a worker

Source code in src/mmisp/worker/api/response_schemas.py
67
68
69
70
71
72
73
74
class WorkerStatusEnum(str, Enum):
    """
    Represents different statuses of a worker
    """

    IDLE = "idle"
    WORKING = "working"
    DEACTIVATED = "deactivated"

WorkerStatusResponse

Bases: BaseModel

Represents the API response of getting the status of a worker

Source code in src/mmisp/worker/api/response_schemas.py
90
91
92
93
94
95
96
97
98
class WorkerStatusResponse(BaseModel):
    """
    Represents the API response of getting the status of a worker
    """

    status: WorkerStatusEnum
    """The status of the worker"""
    jobs_queued: int
    """The number of queued jobs of the worker"""

jobs_queued: int instance-attribute

The number of queued jobs of the worker

status: WorkerStatusEnum instance-attribute

The status of the worker

Encapsulates input data classes for the jobs router.

UserData

Bases: BaseModel

Data class for user_id

Source code in src/mmisp/worker/api/requests_schemas.py
10
11
12
13
14
15
16
class UserData(BaseModel):
    """
    Data class for user_id
    """

    user_id: int
    """The id of the user"""

user_id: int instance-attribute

The id of the user

WorkerEnum

Bases: StrEnum

Represents the implemented workers

Source code in src/mmisp/worker/api/requests_schemas.py
19
20
21
22
23
24
25
26
27
28
29
class WorkerEnum(StrEnum):
    """
    Represents the implemented workers
    """

    PULL = "pull"
    PUSH = "push"
    CORRELATE = "correlation"
    ENRICHMENT = "enrichment"
    SEND_EMAIL = "sendEmail"
    PROCESS_FREE_TEXT = "processFreeText"

worker_router: APIRouter = APIRouter(prefix='/worker') module-attribute

Every method in this file is a route for the worker_router every endpoint is prefixed with /worker and requires the user to be verified

Encapsulates input data classes for the jobs router.

UserData

Bases: BaseModel

Data class for user_id

Source code in src/mmisp/worker/api/requests_schemas.py
10
11
12
13
14
15
16
class UserData(BaseModel):
    """
    Data class for user_id
    """

    user_id: int
    """The id of the user"""

user_id: int instance-attribute

The id of the user

WorkerEnum

Bases: StrEnum

Represents the implemented workers

Source code in src/mmisp/worker/api/requests_schemas.py
19
20
21
22
23
24
25
26
27
28
29
class WorkerEnum(StrEnum):
    """
    Represents the implemented workers
    """

    PULL = "pull"
    PUSH = "push"
    CORRELATE = "correlation"
    ENRICHMENT = "enrichment"
    SEND_EMAIL = "sendEmail"
    PROCESS_FREE_TEXT = "processFreeText"

Every method in this file is a route for the job_router every endpoint is prefixed with /job and requires the user to be verified

verified(credentials=Depends(HTTPBearer(auto_error=False)))

A function to verify the api key that is sent by the client if the api key is not correct, it will raise an HTTPException

:param credentials: credentials sent by the client :type credentials: HTTPAuthorizationCredentials

Source code in src/mmisp/worker/api/api_verification.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
def verified(credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer(auto_error=False))) -> None:
    """
    A function to verify the api key that is sent by the client
    if the api key is not correct, it will raise an HTTPException

    :param credentials: credentials sent by the client
    :type credentials: HTTPAuthorizationCredentials
    """
    if credentials is not None:
        if credentials.credentials == system_config_data.api_key:
            return
    raise HTTPException(status_code=403, detail="authentication failed")

PluginInterface

Bases: Protocol

Protocol class (interface) providing the necessary functions each plugin must implement to be able to become loaded.

Source code in src/mmisp/worker/plugins/loader.py
15
16
17
18
19
20
21
22
23
24
25
26
27
class PluginInterface(Protocol):
    """
    Protocol class (interface) providing the necessary functions each plugin must implement to be able to become loaded.
    """

    @staticmethod
    def register(factory: PluginFactory) -> None:
        """
        Registers the plugin in the given factory.

        :param factory: The factory in which the plugin is registered.
        :type factory: PluginFactory
        """

register(factory) staticmethod

Registers the plugin in the given factory.

:param factory: The factory in which the plugin is registered. :type factory: PluginFactory

Source code in src/mmisp/worker/plugins/loader.py
20
21
22
23
24
25
26
27
@staticmethod
def register(factory: PluginFactory) -> None:
    """
    Registers the plugin in the given factory.

    :param factory: The factory in which the plugin is registered.
    :type factory: PluginFactory
    """

PluginLoader

Implements the loading and registration process of plugins.

Source code in src/mmisp/worker/plugins/loader.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class PluginLoader:
    """
    Implements the loading and registration process of plugins.
    """

    PLUGIN_MODULE_NAME_PREFIX: str = "mmisp_plugins"

    @classmethod
    def _import_module(cls: Type["PluginLoader"], path: str) -> PluginInterface:
        plugin_dir_name: str = os.path.basename(os.path.split(path)[0])
        module_name: str = os.path.splitext(os.path.basename(path))[0]
        extended_module_name: str = f"{cls.PLUGIN_MODULE_NAME_PREFIX}.{plugin_dir_name}.{module_name}"

        module: ModuleType
        if extended_module_name in sys.modules.keys():
            module = sys.modules[extended_module_name]
        else:
            module_spec: ModuleSpec | None
            if os.path.isfile(path):
                module_spec = importlib.util.spec_from_file_location(extended_module_name, path)
            else:
                module_init_path: str = os.path.join(path, "__init__.py")
                module_spec = importlib.util.spec_from_file_location(
                    extended_module_name, module_init_path, submodule_search_locations=[]
                )
            if module_spec is None:
                raise ValueError(f"Could not load module: {path}")

            module = importlib.util.module_from_spec(module_spec)
            sys.modules[extended_module_name] = module
            try:
                if module_spec.loader is None:
                    raise ValueError()
                module_spec.loader.exec_module(module)
            except FileNotFoundError as file_not_found_error:
                raise file_not_found_error
            except Exception as import_error:
                raise PluginImportError(
                    message=f"An error occurred while importing the plugin '{path}'. Error: {import_error}"
                )

        return cast(PluginInterface, module)

    @classmethod
    def load_plugins(cls: Type["PluginLoader"], plugins: list[str], factory: PluginFactory) -> None:
        """
        Loads the specified plugins and registers them in the given factory.

        :param plugins: A list of paths to plugin modules to load.
        :type plugins: list[str]
        :param factory: The factory in which the plugins are to be registered.
        :type factory: PluginFactory
        """

        for plugin in plugins:
            plugin_module: PluginInterface
            try:
                plugin_module = cls._import_module(plugin)
            except FileNotFoundError as file_not_found_error:
                _log.exception(
                    f"Plugin {plugin}: The plugin could not be imported. File not found: {file_not_found_error}"
                )
                continue
            except PluginImportError as import_error:
                _log.exception(f"An error occurred while importing the plugin 'Ì›{plugin}'. Error: {import_error}")
                continue

            try:
                plugin_module.register(factory)
            except PluginRegistrationError as registration_error:
                _log.exception(
                    f"An error occurred while registering the plugin '{plugin}'. Error: {registration_error}"
                )
                continue

    @classmethod
    def load_plugins_from_directory(cls: Type["PluginLoader"], directory: str, factory: PluginFactory) -> None:
        """
        Loads all plugins that are in the specified directory.

        :param directory: The path to a directory containing plugins to load.
        :type directory: str
        :param factory: The factory in which the plugins are to be registered.
        :type factory: PluginFactory
        """

        if not directory:
            raise ValueError("Path to a directory is required. May not be empty.")
        if not os.path.isdir(directory):
            raise ValueError(f"The directory '{directory}' doesn't exist.")

        path_content: list[str] = os.listdir(directory)
        plugin_modules: list[str] = []
        for element in path_content:
            element_path: str = os.path.join(directory, element)
            if os.path.isdir(element_path):
                if os.path.isfile(os.path.join(element_path, "__init__.py")):
                    plugin_modules.append(element_path)
            else:
                file_extension: str = os.path.splitext(element)[1]
                if file_extension == ".py":
                    plugin_modules.append(element_path)

        cls.load_plugins(plugin_modules, factory)

load_plugins(plugins, factory) classmethod

Loads the specified plugins and registers them in the given factory.

:param plugins: A list of paths to plugin modules to load. :type plugins: list[str] :param factory: The factory in which the plugins are to be registered. :type factory: PluginFactory

Source code in src/mmisp/worker/plugins/loader.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@classmethod
def load_plugins(cls: Type["PluginLoader"], plugins: list[str], factory: PluginFactory) -> None:
    """
    Loads the specified plugins and registers them in the given factory.

    :param plugins: A list of paths to plugin modules to load.
    :type plugins: list[str]
    :param factory: The factory in which the plugins are to be registered.
    :type factory: PluginFactory
    """

    for plugin in plugins:
        plugin_module: PluginInterface
        try:
            plugin_module = cls._import_module(plugin)
        except FileNotFoundError as file_not_found_error:
            _log.exception(
                f"Plugin {plugin}: The plugin could not be imported. File not found: {file_not_found_error}"
            )
            continue
        except PluginImportError as import_error:
            _log.exception(f"An error occurred while importing the plugin 'Ì›{plugin}'. Error: {import_error}")
            continue

        try:
            plugin_module.register(factory)
        except PluginRegistrationError as registration_error:
            _log.exception(
                f"An error occurred while registering the plugin '{plugin}'. Error: {registration_error}"
            )
            continue

load_plugins_from_directory(directory, factory) classmethod

Loads all plugins that are in the specified directory.

:param directory: The path to a directory containing plugins to load. :type directory: str :param factory: The factory in which the plugins are to be registered. :type factory: PluginFactory

Source code in src/mmisp/worker/plugins/loader.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def load_plugins_from_directory(cls: Type["PluginLoader"], directory: str, factory: PluginFactory) -> None:
    """
    Loads all plugins that are in the specified directory.

    :param directory: The path to a directory containing plugins to load.
    :type directory: str
    :param factory: The factory in which the plugins are to be registered.
    :type factory: PluginFactory
    """

    if not directory:
        raise ValueError("Path to a directory is required. May not be empty.")
    if not os.path.isdir(directory):
        raise ValueError(f"The directory '{directory}' doesn't exist.")

    path_content: list[str] = os.listdir(directory)
    plugin_modules: list[str] = []
    for element in path_content:
        element_path: str = os.path.join(directory, element)
        if os.path.isdir(element_path):
            if os.path.isfile(os.path.join(element_path, "__init__.py")):
                plugin_modules.append(element_path)
        else:
            file_extension: str = os.path.splitext(element)[1]
            if file_extension == ".py":
                plugin_modules.append(element_path)

    cls.load_plugins(plugin_modules, factory)

Plugin

Bases: Protocol

Interface providing all attributes and methods a plugin must implement.

Source code in src/mmisp/worker/plugins/plugin.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class Plugin(Protocol):
    """
    Interface providing all attributes and methods a plugin must implement.
    """

    PLUGIN_INFO: PluginInfo
    """Information about the plugin."""

    def run(self: Self) -> Any:
        """
        Entry point of the plugin. Runs the plugin and returns any existing result.

        :return: The result the plugin returns
        :rtype Any
        """
        ...

PLUGIN_INFO: PluginInfo instance-attribute

Information about the plugin.

run()

Entry point of the plugin. Runs the plugin and returns any existing result.

:return: The result the plugin returns :rtype Any

Source code in src/mmisp/worker/plugins/plugin.py
14
15
16
17
18
19
20
21
def run(self: Self) -> Any:
    """
    Entry point of the plugin. Runs the plugin and returns any existing result.

    :return: The result the plugin returns
    :rtype Any
    """
    ...

PluginFactory

Bases: Generic[_T, _U], ABC

Provides a Factory for registering and managing plugins.

Instantiation of plugins is not part of this class.

Source code in src/mmisp/worker/plugins/factory.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class PluginFactory(Generic[_T, _U], ABC):
    """
    Provides a Factory for registering and managing plugins.

    Instantiation of plugins is not part of this class.
    """

    def __init__(self: Self) -> None:
        """
        Constructs a new plugin factory without any plugins registered.
        """

        self._plugins: dict[str, type[_T]] = {}

    def register(self: Self, plugin: type[_T]) -> None:
        """
        Registers a new plugin.

        :param plugin: The class of the plugin to register.
        :type plugin: type[T]
        :raises NotAValidPlugin: If the plugin is missing the 'PLUGIN_INFO' attribute.
        :raises PluginRegistrationError: If there is already a plugin registered with the same name.
        """

        plugin_info: PluginInfo
        try:
            plugin_info = cast(_U, plugin.PLUGIN_INFO)
        except AttributeError:
            raise NotAValidPlugin(message="Attribute 'PLUGIN_INFO' is missing.")

        if plugin_info.NAME not in self._plugins:
            self._plugins[plugin_info.NAME] = plugin
        elif plugin != self._plugins[plugin_info.NAME]:
            raise PluginRegistrationError(
                f"Registration not possible. The are at least two plugins with the same name '{plugin_info.NAME}'."
            )
        else:
            # If plugin is already registered, do nothing.
            pass

    def unregister(self: Self, plugin_name: str) -> None:
        """
        Unregisters a plugin.

        The plugin can be registered again later.

        :param plugin_name: The name of the plugin to remove from the factory.
        :type plugin_name: str
        :raises PluginNotFound: If there is no plugin with the specified name.
        """

        if not self.is_plugin_registered(plugin_name):
            raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'. Cannot be removed.")

        self._plugins.pop(plugin_name)

    def get_plugin_info(self: Self, plugin_name: str) -> _U:
        """
        Returns information about a registered plugin.

        :param plugin_name: The name of the plugin.
        :type plugin_name: str
        :return: The information about the plugin.
        :rtype: U
        :raises PluginNotFound: If there is no plugin with the specified name.
        """

        if not self.is_plugin_registered(plugin_name):
            raise PluginNotFound(message=f"The specified plugin '{plugin_name}' is not known.")

        return cast(_U, self._plugins[plugin_name].PLUGIN_INFO)

    def get_plugins(self: Self) -> list[_U]:
        """
        Returns a list of registered Plugins.

        :return: The list of plugins.
        :rtype: list[PluginInfo]
        """

        info: list[_U] = []
        for plugin in self._plugins:
            info.append(cast(_U, self._plugins[plugin].PLUGIN_INFO))

        return info

    def is_plugin_registered(self: Self, plugin_name: str) -> bool:
        """
        Checks if the given plugin is registered in the factory.

        :param plugin_name: The name of the plugin to check.
        :type plugin_name: str
        :return: True if the plugin is registered
        """

        if plugin_name:
            return plugin_name in self._plugins
        else:
            raise ValueError("Plugin name may not be emtpy.")

__init__()

Constructs a new plugin factory without any plugins registered.

Source code in src/mmisp/worker/plugins/factory.py
19
20
21
22
23
24
def __init__(self: Self) -> None:
    """
    Constructs a new plugin factory without any plugins registered.
    """

    self._plugins: dict[str, type[_T]] = {}

get_plugin_info(plugin_name)

Returns information about a registered plugin.

:param plugin_name: The name of the plugin. :type plugin_name: str :return: The information about the plugin. :rtype: U :raises PluginNotFound: If there is no plugin with the specified name.

Source code in src/mmisp/worker/plugins/factory.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_plugin_info(self: Self, plugin_name: str) -> _U:
    """
    Returns information about a registered plugin.

    :param plugin_name: The name of the plugin.
    :type plugin_name: str
    :return: The information about the plugin.
    :rtype: U
    :raises PluginNotFound: If there is no plugin with the specified name.
    """

    if not self.is_plugin_registered(plugin_name):
        raise PluginNotFound(message=f"The specified plugin '{plugin_name}' is not known.")

    return cast(_U, self._plugins[plugin_name].PLUGIN_INFO)

get_plugins()

Returns a list of registered Plugins.

:return: The list of plugins. :rtype: list[PluginInfo]

Source code in src/mmisp/worker/plugins/factory.py
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_plugins(self: Self) -> list[_U]:
    """
    Returns a list of registered Plugins.

    :return: The list of plugins.
    :rtype: list[PluginInfo]
    """

    info: list[_U] = []
    for plugin in self._plugins:
        info.append(cast(_U, self._plugins[plugin].PLUGIN_INFO))

    return info

is_plugin_registered(plugin_name)

Checks if the given plugin is registered in the factory.

:param plugin_name: The name of the plugin to check. :type plugin_name: str :return: True if the plugin is registered

Source code in src/mmisp/worker/plugins/factory.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def is_plugin_registered(self: Self, plugin_name: str) -> bool:
    """
    Checks if the given plugin is registered in the factory.

    :param plugin_name: The name of the plugin to check.
    :type plugin_name: str
    :return: True if the plugin is registered
    """

    if plugin_name:
        return plugin_name in self._plugins
    else:
        raise ValueError("Plugin name may not be emtpy.")

register(plugin)

Registers a new plugin.

:param plugin: The class of the plugin to register. :type plugin: type[T] :raises NotAValidPlugin: If the plugin is missing the 'PLUGIN_INFO' attribute. :raises PluginRegistrationError: If there is already a plugin registered with the same name.

Source code in src/mmisp/worker/plugins/factory.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def register(self: Self, plugin: type[_T]) -> None:
    """
    Registers a new plugin.

    :param plugin: The class of the plugin to register.
    :type plugin: type[T]
    :raises NotAValidPlugin: If the plugin is missing the 'PLUGIN_INFO' attribute.
    :raises PluginRegistrationError: If there is already a plugin registered with the same name.
    """

    plugin_info: PluginInfo
    try:
        plugin_info = cast(_U, plugin.PLUGIN_INFO)
    except AttributeError:
        raise NotAValidPlugin(message="Attribute 'PLUGIN_INFO' is missing.")

    if plugin_info.NAME not in self._plugins:
        self._plugins[plugin_info.NAME] = plugin
    elif plugin != self._plugins[plugin_info.NAME]:
        raise PluginRegistrationError(
            f"Registration not possible. The are at least two plugins with the same name '{plugin_info.NAME}'."
        )
    else:
        # If plugin is already registered, do nothing.
        pass

unregister(plugin_name)

Unregisters a plugin.

The plugin can be registered again later.

:param plugin_name: The name of the plugin to remove from the factory. :type plugin_name: str :raises PluginNotFound: If there is no plugin with the specified name.

Source code in src/mmisp/worker/plugins/factory.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def unregister(self: Self, plugin_name: str) -> None:
    """
    Unregisters a plugin.

    The plugin can be registered again later.

    :param plugin_name: The name of the plugin to remove from the factory.
    :type plugin_name: str
    :raises PluginNotFound: If there is no plugin with the specified name.
    """

    if not self.is_plugin_registered(plugin_name):
        raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'. Cannot be removed.")

    self._plugins.pop(plugin_name)

ASTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for AS Numbers

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
class ASTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for AS Numbers
    """

    as_regex = re.compile(r"^as\d+$", re.IGNORECASE)

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an ASAttribute

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when an AS is found, otherwise None
        :rtype: AttributeType | None
        """
        if self.as_regex.match(input_str):
            return AttributeType(types=["AS"], default_type="AS", value=input_str.upper())
        return None

validate(input_str)

This method is used when a String is validated as an ASAttribute

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when an AS is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
395
396
397
398
399
400
401
402
403
404
405
406
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an ASAttribute

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when an AS is found, otherwise None
    :rtype: AttributeType | None
    """
    if self.as_regex.match(input_str):
        return AttributeType(types=["AS"], default_type="AS", value=input_str.upper())
    return None

BTCTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for bitcoin-addresses

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class BTCTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for bitcoin-addresses
    """

    bitcoin_address_regex = re.compile(
        r"^(?:[13][a-km-zA-HJ-NP-Z1-9]{25,34}|(bc|tb)1[023456789acdefghjklmnpqrstuvwxyz]{11,71})$", re.IGNORECASE
    )

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an BTCAttribute

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when a BTC address is found, otherwise None
        :rtype: AttributeType | None
        """
        if self.bitcoin_address_regex.match(input_str):
            return AttributeType(types=["btc"], default_type="btc", value=input_str)
        return None

validate(input_str)

This method is used when a String is validated as an BTCAttribute

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when a BTC address is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
418
419
420
421
422
423
424
425
426
427
428
429
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an BTCAttribute

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when a BTC address is found, otherwise None
    :rtype: AttributeType | None
    """
    if self.bitcoin_address_regex.match(input_str):
        return AttributeType(types=["btc"], default_type="btc", value=input_str)
    return None

CVETypeValidator

Bases: TypeValidator

This Class implements a validationmethod for vulnerabilites

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class CVETypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for vulnerabilites
    """

    cve_regex = re.compile(r"^cve-\d{4}-\d{4,9}$", re.IGNORECASE)

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
         This method is used when a String is validated as an CVEAttribute

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when a CVE is found, otherwise None
        :rtype: AttributeType | None
        """
        if self.cve_regex.match(input_str):  # vaildates a CVE
            return AttributeType(
                types=["vulnerability"], default_type="vulnerability", value=input_str.upper()
            )  # 'CVE' must be uppercase
        return None

validate(input_str)

This method is used when a String is validated as an CVEAttribute

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when a CVE is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
     This method is used when a String is validated as an CVEAttribute

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when a CVE is found, otherwise None
    :rtype: AttributeType | None
    """
    if self.cve_regex.match(input_str):  # vaildates a CVE
        return AttributeType(
            types=["vulnerability"], default_type="vulnerability", value=input_str.upper()
        )  # 'CVE' must be uppercase
    return None

DomainFilenameTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for Domain- and Filenames

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class DomainFilenameTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for Domain- and Filenames
    """

    _securityVendorDomains = config.security_vendors
    _domain_pattern = re.compile(r"^([-\w]+\.)+[A-Z0-9-]+$", re.IGNORECASE | re.UNICODE)
    _link_pattern = re.compile(r"^https://([^/]*)", re.IGNORECASE)

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as a Domain- or FilenameAttribute
        it checks if the string is a Domain, URL, Link, Filename or a Regkey and returns the AttributeType,
        otherwise None

        :param input_str: the string to validate
        :type input_str: str
        :return: returns the AttributeType when a Domain or Filename is found, otherwise None
        :rtype: AttributeType | None
        """

        input_without_port: str = self._remove_port(input_str)
        if "." in input_without_port:
            split_input: list[str] = input_without_port.split(".")
            if self._domain_pattern.match(input_without_port) and PublicSuffixList().get_public_suffix(
                split_input[-1], strict=True
            ):  # validate TLD
                if len(split_input) > 2:
                    return AttributeType(
                        types=["hostname", "domain", "url", "filename"],
                        default_type="hostname",
                        value=input_without_port,
                    )
                else:
                    return AttributeType(types=["domain", "filename"], default_type="domain", value=input_without_port)
            else:
                if len(split_input) > 1 and (url(input_without_port) or url("http://" + input_without_port)):
                    if self._is_link(input_without_port):
                        return AttributeType(types=["link"], default_type="link", value=input_without_port)
                    if "/" in input_without_port:
                        return AttributeType(types=["url"], default_type="url", value=input_without_port)
                if resolve_filename(input_str):
                    return AttributeType(types=["filename"], default_type="filename", value=input_str)

        if "\\" in input_str:
            split_input = input_without_port.split("\\")
            if split_input[0]:
                return AttributeType(types=["regkey"], default_type="regkey", value=input_str)
        return None

    @staticmethod
    def _remove_port(input_str: str) -> str:
        """
        This method is used to remove the port from a string

        :param input_str: the string to remove the port from
        :type input_str: str
        :return: returns the string without the port
        :rtype: str
        """
        if re.search(r"(:\d{2,5})", input_str):  # checks if the string has a port at the end
            return re.sub(r"(?<=:)[^:]+$", "", input_str).removesuffix(":")
        return input_str

    def _is_link(self: Self, input_str: str) -> bool:
        """
        This method is used to check if a string is a link by checking if the domain is a security vendor domain

        :param input_str: the string to check
        :type input_str: str
        :return: returns True when the string is a link, otherwise False
        :rtype: bool
        """
        found_link = self._link_pattern.match(input_str)

        if found_link:
            domain_to_check = ""
            domain_parts = list(reversed(found_link.group(1).split(".")))  # Extract and reverse the domain parts

            for domain_part in domain_parts:
                domain_to_check = domain_part + domain_to_check
                if domain_to_check in self._securityVendorDomains:
                    return True
                domain_to_check = "." + domain_to_check
        return False

validate(input_str)

This method is used when a String is validated as a Domain- or FilenameAttribute it checks if the string is a Domain, URL, Link, Filename or a Regkey and returns the AttributeType, otherwise None

:param input_str: the string to validate :type input_str: str :return: returns the AttributeType when a Domain or Filename is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as a Domain- or FilenameAttribute
    it checks if the string is a Domain, URL, Link, Filename or a Regkey and returns the AttributeType,
    otherwise None

    :param input_str: the string to validate
    :type input_str: str
    :return: returns the AttributeType when a Domain or Filename is found, otherwise None
    :rtype: AttributeType | None
    """

    input_without_port: str = self._remove_port(input_str)
    if "." in input_without_port:
        split_input: list[str] = input_without_port.split(".")
        if self._domain_pattern.match(input_without_port) and PublicSuffixList().get_public_suffix(
            split_input[-1], strict=True
        ):  # validate TLD
            if len(split_input) > 2:
                return AttributeType(
                    types=["hostname", "domain", "url", "filename"],
                    default_type="hostname",
                    value=input_without_port,
                )
            else:
                return AttributeType(types=["domain", "filename"], default_type="domain", value=input_without_port)
        else:
            if len(split_input) > 1 and (url(input_without_port) or url("http://" + input_without_port)):
                if self._is_link(input_without_port):
                    return AttributeType(types=["link"], default_type="link", value=input_without_port)
                if "/" in input_without_port:
                    return AttributeType(types=["url"], default_type="url", value=input_without_port)
            if resolve_filename(input_str):
                return AttributeType(types=["filename"], default_type="filename", value=input_str)

    if "\\" in input_str:
        split_input = input_without_port.split("\\")
        if split_input[0]:
            return AttributeType(types=["regkey"], default_type="regkey", value=input_str)
    return None

EmailTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for email-adresses

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class EmailTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for email-adresses
    """

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an EmailAttribute

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when an Email address is found, otherwise None
        :rtype: AttributeType | None
        """
        try:
            validate_email(input_str, check_deliverability=False)
            return AttributeType(
                types=["email", "email-src", "email-dst", "target-email", "whois-registrant-email"],
                default_type="email-src",
                value=input_str,
            )
        except EmailNotValidError:
            return None

validate(input_str)

This method is used when a String is validated as an EmailAttribute

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when an Email address is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an EmailAttribute

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when an Email address is found, otherwise None
    :rtype: AttributeType | None
    """
    try:
        validate_email(input_str, check_deliverability=False)
        return AttributeType(
            types=["email", "email-src", "email-dst", "target-email", "whois-registrant-email"],
            default_type="email-src",
            value=input_str,
        )
    except EmailNotValidError:
        return None

HashTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for md5,sha1,sha224,sha256,sha384 and sha512 hashes

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class HashTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for md5,sha1,sha224,sha256,sha384 and sha512 hashes
    """

    class HashTypes(BaseModel):
        """
        This class encapsulates a HashTypes-Object, which is used to differentiate between single or composite hashes
        """

        single: list[str]
        composite: list[str]

    """
    The hex_hash_types Variable includes a dictionary that maps the length of hashes to the possible types
    """
    hex_hash_types = {
        32: HashTypes(
            single=["md5", "imphash", "x509-fingerprint-md5", "ja3-fingerprint-md5"],
            composite=["filename|md5", "filename|imphash"],
        ),
        40: HashTypes(
            single=["sha1", "pehash", "x509-fingerprint-sha1", "cdhash"], composite=["filename|sha1", "filename|pehash"]
        ),
        56: HashTypes(single=["sha224", "sha512/224"], composite=["filename|sha224", "filename|sha512/224"]),
        64: HashTypes(
            single=["sha256", "authentihash", "sha512/256", "x509-fingerprint-sha256"],
            composite=["filename|sha256", "filename|authentihash", "filename|sha512/256"],
        ),
        96: HashTypes(single=["sha384"], composite=["filename|sha384"]),
        128: HashTypes(single=["sha512"], composite=["filename|sha512"]),
    }

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an HashAttribute
        it checks if the string is a single or composite hash and returns the AttributeType, otherwise None
        valid hashes are md5,sha1,sha224,sha256,sha384,sha512 and ssdeep

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when a Hash is found, otherwise None
        :rtype: AttributeType | None
        """

        if "|" in input_str:  # checks if the string could be a composite hash
            split_string = input_str.split("|")
            if len(split_string) == 2 and resolve_filename(split_string[0]):  # checks if the first part is a filename
                found_hash: HashTypeValidator.HashTypes | None = self._resolve_hash(split_string[1])  # checks if the
                # second part is a hash
                if found_hash is not None:
                    return AttributeType(
                        types=found_hash.composite, default_type=found_hash.composite[0], value=input_str
                    )
                if self._resolve_ssdeep(split_string[1]):  # checks if the second part is a ssdeep hash
                    return AttributeType(types=["fi**lename|ssdeep"], default_type="filename|ssdeep", value=input_str)

        found_hash = self._resolve_hash(input_str)  # checks if the string is a single hash
        if found_hash is not None:
            hash_type = AttributeType(types=found_hash.single, default_type=found_hash.single[0], value=input_str)
            if BTCTypeValidator().validate(input_str):  # checks if the hash is a btc hash
                hash_type.types.append("btc")
            return hash_type
        if self._resolve_ssdeep(input_str):  # checks if the string is a ssdeep hash
            return AttributeType(types=["ssdeep"], default_type="ssdeep", value=input_str)
        return None

    @classmethod
    def _resolve_hash(cls: Type["HashTypeValidator"], input_str: str) -> HashTypes | None:
        """
        This function validates whether the input is a Hash and returns the possible types
        valid hashes are md5,sha1,sha224,sha256,sha384,sha512

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the possible types of the hash when a hash was found, otherwise None
        :rtype: HashTypes
        """
        if len(input_str) in cls.hex_hash_types:
            try:
                int(input_str, 16)
                return cls.hex_hash_types[len(input_str)]
            except ValueError:
                return None
        return None

    @classmethod
    def _resolve_ssdeep(cls: Type["HashTypeValidator"], input_str: str) -> bool:
        """
        This method is used to resolve a ssdeep Hash

        :param input_str:
        :type input_str:
        :return:
        :rtype:
        """
        if re.match(r"^\d+:[0-9a-zA-Z/+]+:[0-9a-zA-Z/+]+$", input_str):
            if not re.match(r"^\d{1,2}:\d{1,2}:\d{1,2}$", input_str):
                return True
        return False

HashTypes

Bases: BaseModel

This class encapsulates a HashTypes-Object, which is used to differentiate between single or composite hashes

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
215
216
217
218
219
220
221
class HashTypes(BaseModel):
    """
    This class encapsulates a HashTypes-Object, which is used to differentiate between single or composite hashes
    """

    single: list[str]
    composite: list[str]

validate(input_str)

This method is used when a String is validated as an HashAttribute it checks if the string is a single or composite hash and returns the AttributeType, otherwise None valid hashes are md5,sha1,sha224,sha256,sha384,sha512 and ssdeep

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when a Hash is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an HashAttribute
    it checks if the string is a single or composite hash and returns the AttributeType, otherwise None
    valid hashes are md5,sha1,sha224,sha256,sha384,sha512 and ssdeep

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when a Hash is found, otherwise None
    :rtype: AttributeType | None
    """

    if "|" in input_str:  # checks if the string could be a composite hash
        split_string = input_str.split("|")
        if len(split_string) == 2 and resolve_filename(split_string[0]):  # checks if the first part is a filename
            found_hash: HashTypeValidator.HashTypes | None = self._resolve_hash(split_string[1])  # checks if the
            # second part is a hash
            if found_hash is not None:
                return AttributeType(
                    types=found_hash.composite, default_type=found_hash.composite[0], value=input_str
                )
            if self._resolve_ssdeep(split_string[1]):  # checks if the second part is a ssdeep hash
                return AttributeType(types=["fi**lename|ssdeep"], default_type="filename|ssdeep", value=input_str)

    found_hash = self._resolve_hash(input_str)  # checks if the string is a single hash
    if found_hash is not None:
        hash_type = AttributeType(types=found_hash.single, default_type=found_hash.single[0], value=input_str)
        if BTCTypeValidator().validate(input_str):  # checks if the hash is a btc hash
            hash_type.types.append("btc")
        return hash_type
    if self._resolve_ssdeep(input_str):  # checks if the string is a ssdeep hash
        return AttributeType(types=["ssdeep"], default_type="ssdeep", value=input_str)
    return None

IPTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for simple IPv4 and IPv6 adresses, without a port

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class IPTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for simple IPv4 and IPv6 adresses, without a port
    """

    brackets_pattern: str = r"\[([^]]+)\]"

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an IPAttribute
        if the string is an IP, it returns the AttributeType, otherwise None

        it checks if the string is an IPv4 or IPv6 IP with or without a Port, or a CIDR Block

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when an IP is found, otherwise None
        :rtype: AttributeType | None
        """

        ip_without_port: str = input_str

        if self.__validate_ip(input_str):  # checks if the string is an IPv4 or IPv6 IP without a Port
            return AttributeType(types=["ip-dst", "ip-src", "ip-src/ip-dst"], default_type="ip-dst", value=input_str)

        if re.search(r"(:\d{2,5})", input_str):  # checks if the string has a port at the end
            port: str = input_str.split(":")[-1]
            ip_without_port = re.sub(r"(?<=:)\d+$", "", input_str).removesuffix(":")
            # removes [] from ipv6
            match = re.search(self.brackets_pattern, ip_without_port)
            if match:
                extracted_ipv6 = match.group(1)
                ip_without_port = ip_without_port.replace(match.group(0), extracted_ipv6)

            if self.__validate_ip(ip_without_port):
                return AttributeType(
                    types=["ip-dst|port", "ip-src|port", "ip-src|port/ip-dst|port"],
                    default_type="ip-dst|port",
                    value=ip_without_port + "|" + port,
                )

        if ip_without_port.find("/"):  # check if it is a CIDR Block
            split_ip: list[str] = ip_without_port.split("/")
            if len(split_ip) == 2:
                if self.__validate_ip(split_ip[0]) and split_ip[1].isnumeric():
                    return AttributeType(
                        types=["ip-dst", "ip-src", "ip-src/ip-dst"], default_type="ip-dst", value=ip_without_port
                    )
        return None

    def __validate_ip(self: Self, input_str: str) -> bool:
        """
        This method is used to check if a string is an IPv4 or IPv6 IP
        returns True when an IP is found, otherwise False

        :param input_str: the string to check
        :type input_str: str
        :return: returns True when an IP is found, otherwise False
        :rtype: bool
        """
        try:
            ipaddress.ip_address(input_str)
            return True
        except ValueError:
            return False

__validate_ip(input_str)

This method is used to check if a string is an IPv4 or IPv6 IP returns True when an IP is found, otherwise False

:param input_str: the string to check :type input_str: str :return: returns True when an IP is found, otherwise False :rtype: bool

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __validate_ip(self: Self, input_str: str) -> bool:
    """
    This method is used to check if a string is an IPv4 or IPv6 IP
    returns True when an IP is found, otherwise False

    :param input_str: the string to check
    :type input_str: str
    :return: returns True when an IP is found, otherwise False
    :rtype: bool
    """
    try:
        ipaddress.ip_address(input_str)
        return True
    except ValueError:
        return False

validate(input_str)

This method is used when a String is validated as an IPAttribute if the string is an IP, it returns the AttributeType, otherwise None

it checks if the string is an IPv4 or IPv6 IP with or without a Port, or a CIDR Block

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when an IP is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an IPAttribute
    if the string is an IP, it returns the AttributeType, otherwise None

    it checks if the string is an IPv4 or IPv6 IP with or without a Port, or a CIDR Block

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when an IP is found, otherwise None
    :rtype: AttributeType | None
    """

    ip_without_port: str = input_str

    if self.__validate_ip(input_str):  # checks if the string is an IPv4 or IPv6 IP without a Port
        return AttributeType(types=["ip-dst", "ip-src", "ip-src/ip-dst"], default_type="ip-dst", value=input_str)

    if re.search(r"(:\d{2,5})", input_str):  # checks if the string has a port at the end
        port: str = input_str.split(":")[-1]
        ip_without_port = re.sub(r"(?<=:)\d+$", "", input_str).removesuffix(":")
        # removes [] from ipv6
        match = re.search(self.brackets_pattern, ip_without_port)
        if match:
            extracted_ipv6 = match.group(1)
            ip_without_port = ip_without_port.replace(match.group(0), extracted_ipv6)

        if self.__validate_ip(ip_without_port):
            return AttributeType(
                types=["ip-dst|port", "ip-src|port", "ip-src|port/ip-dst|port"],
                default_type="ip-dst|port",
                value=ip_without_port + "|" + port,
            )

    if ip_without_port.find("/"):  # check if it is a CIDR Block
        split_ip: list[str] = ip_without_port.split("/")
        if len(split_ip) == 2:
            if self.__validate_ip(split_ip[0]) and split_ip[1].isnumeric():
                return AttributeType(
                    types=["ip-dst", "ip-src", "ip-src/ip-dst"], default_type="ip-dst", value=ip_without_port
                )
    return None

PhonenumberTypeValidator

Bases: TypeValidator

This Class implements a validationmethod for phonenumbers

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
class PhonenumberTypeValidator(TypeValidator):
    """
    This Class implements a validationmethod for phonenumbers
    """

    date_regex = re.compile(r"^\d{4}-\d{2}-\d{2}$")
    phone_number_regex = re.compile(r"^(\+)?(\d{1,3}(\(0\))?)?[0-9/\-]{5,}\d$", re.IGNORECASE)

    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an PhoneNumberAttribute

        :param input_str: input string to validate
        :type input_str: str
        :return: returns the AttributeType when a PhoneNumber is found, otherwise None
        :rtype: AttributeType | None
        """
        if input_str.startswith("+") or (input_str.find("-") != -1):
            if not self.date_regex.match(input_str):  # checks if the string is not a date
                if self.phone_number_regex.match(input_str):  # checks if the string is a phone number
                    return AttributeType(
                        types=["phone-number", "prtn", "whois-registrant-phone"],
                        default_type="phone-number",
                        value=input_str,
                    )
        return None

validate(input_str)

This method is used when a String is validated as an PhoneNumberAttribute

:param input_str: input string to validate :type input_str: str :return: returns the AttributeType when a PhoneNumber is found, otherwise None :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an PhoneNumberAttribute

    :param input_str: input string to validate
    :type input_str: str
    :return: returns the AttributeType when a PhoneNumber is found, otherwise None
    :rtype: AttributeType | None
    """
    if input_str.startswith("+") or (input_str.find("-") != -1):
        if not self.date_regex.match(input_str):  # checks if the string is not a date
            if self.phone_number_regex.match(input_str):  # checks if the string is a phone number
                return AttributeType(
                    types=["phone-number", "prtn", "whois-registrant-phone"],
                    default_type="phone-number",
                    value=input_str,
                )
    return None

TypeValidator

Bases: ABC

Abstract model of a Validator Object, which is used to decide, whether a String is a representation of a certain Attribute or not. It returns the Attribute Type when an Attribute is found, and None if not

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class TypeValidator(ABC):
    """
    Abstract model of a Validator Object, which is used to decide, whether a String is a representation
    of a certain Attribute or not. It returns the Attribute Type when an Attribute is found, and None if not
    """

    @abstractmethod
    def validate(self: Self, input_str: str) -> AttributeType | None:
        """
        This method is used when a String is validated as an Attribute

        :param input_str: the string to validate
        :type input_str: str
        :return: returns the AttributeType when an Attribute is found, and None if not
        :rtype: AttributeType | None
        """
        pass

validate(input_str) abstractmethod

This method is used when a String is validated as an Attribute

:param input_str: the string to validate :type input_str: str :return: returns the AttributeType when an Attribute is found, and None if not :rtype: AttributeType | None

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
43
44
45
46
47
48
49
50
51
52
53
@abstractmethod
def validate(self: Self, input_str: str) -> AttributeType | None:
    """
    This method is used when a String is validated as an Attribute

    :param input_str: the string to validate
    :type input_str: str
    :return: returns the AttributeType when an Attribute is found, and None if not
    :rtype: AttributeType | None
    """
    pass

resolve_filename(input_str)

This method is used to check if a string is a filename, by checking if it has a file extension(an alphanumeric not numeric string) or a drive letter

:param input_str: the string to check :type input_str: str :return: returns True when the string is a filename, otherwise False :rtype: bool

Source code in src/mmisp/worker/jobs/processfreetext/attribute_types/type_validator.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def resolve_filename(input_str: str) -> bool:
    """
    This method is used to check if a string is a filename, by checking if it has a file extension(an alphanumeric
    not numeric string) or a drive letter

    :param input_str: the string to check
    :type input_str: str
    :return: returns True when the string is a filename, otherwise False
    :rtype: bool
    """
    if re.match(r"^.:/", input_str) or "." in input_str:  # check if it is a drive letter or includes a dot
        split = input_str.split(".")
        if split and not split[-1].isnumeric() and split[-1].isalnum():
            return True  # when the last part is alphanumeric and not numeric
    return False

ProcessFreeTextData

Bases: BaseModel

Represents the input data of the ProcessFreeTextJob

Source code in src/mmisp/worker/jobs/processfreetext/job_data.py
 6
 7
 8
 9
10
11
class ProcessFreeTextData(BaseModel):
    """
    Represents the input data of the ProcessFreeTextJob
    """

    data: str

ProcessFreeTextResponse

Bases: BaseModel

Represents the response of the ProcessFreeTextJob

Source code in src/mmisp/worker/jobs/processfreetext/job_data.py
14
15
16
17
18
19
class ProcessFreeTextResponse(BaseModel):
    """
    Represents the response of the ProcessFreeTextJob
    """

    attributes: list[AttributeType]

ProcessfreetextConfigData

Bases: ConfigData

Encapsulates configuration for the processfreetext worker and its jobs.

Source code in src/mmisp/worker/jobs/processfreetext/processfreetext_config_data.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ProcessfreetextConfigData(ConfigData):
    """
    Encapsulates configuration for the processfreetext worker and its jobs.
    """

    security_vendors: list[str] = ["virustotal.com", "hybrid-analysis.com"]
    """The security vendors to use for the processfreetext worker."""

    def __init__(self: Self) -> None:
        super().__init__()
        self.read_from_env()

    def read_from_env(self: Self) -> None:
        """
        Reads the configuration from the environment.
        """

        env_security_vendors: str = os.getenv(ENV_SECURITY_VENDORS, "")
        if env_security_vendors:
            self.security_vendors = env_security_vendors.split(",")

security_vendors: list[str] = ['virustotal.com', 'hybrid-analysis.com'] class-attribute instance-attribute

The security vendors to use for the processfreetext worker.

read_from_env()

Reads the configuration from the environment.

Source code in src/mmisp/worker/jobs/processfreetext/processfreetext_config_data.py
21
22
23
24
25
26
27
28
def read_from_env(self: Self) -> None:
    """
    Reads the configuration from the environment.
    """

    env_security_vendors: str = os.getenv(ENV_SECURITY_VENDORS, "")
    if env_security_vendors:
        self.security_vendors = env_security_vendors.split(",")

processfreetext_job(user, data)

celery task that processes the given free text and returns a list of found attributes

:param user: the user that requested the job :type user: UserData :param data: the data to process, containing the free text string :type data: ProcessFreeTextData :return: returns a list of found attributes :rtype: ProcessFreeTextData

Source code in src/mmisp/worker/jobs/processfreetext/processfreetext_job.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@celery_app.task
def processfreetext_job(user: UserData, data: ProcessFreeTextData) -> ProcessFreeTextResponse:
    """
    celery task that processes the given free text and returns a list of found attributes

    :param user: the user that requested the job
    :type user: UserData
    :param data: the data to process, containing the free text string
    :type data: ProcessFreeTextData
    :return: returns a list of found attributes
    :rtype: ProcessFreeTextData
    """
    found_attributes: list[AttributeType] = []
    word_list: list[str] = _split_text(data.data)
    for word in word_list:
        possible_attribute: AttributeType | None = _parse_attribute(word)
        if possible_attribute is not None:
            found_attributes.append(possible_attribute)
            logger.info(f"Found attribute: {possible_attribute}")
    logger.info("finished processing free text data")
    return ProcessFreeTextResponse(attributes=found_attributes)

EnrichAttributeData

Bases: BaseModel

Encapsulates the necessary data to create an enrich-attribute job.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class EnrichAttributeData(BaseModel):
    """
    Encapsulates the necessary data to create an enrich-attribute job.
    """

    class Config:
        """
        Pydantic configuration.
        """

        allow_mutation: bool = False
        anystr_strip_whitespace: bool = True
        min_anystr_length: int = 1

    attribute_id: NonNegativeInt
    """The ID of the attribute to enrich."""
    enrichment_plugins: list[str]
    """The list of enrichment plugins to use for enrichment"""

attribute_id: NonNegativeInt instance-attribute

The ID of the attribute to enrich.

enrichment_plugins: list[str] instance-attribute

The list of enrichment plugins to use for enrichment

Config

Pydantic configuration.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
 9
10
11
12
13
14
15
16
class Config:
    """
    Pydantic configuration.
    """

    allow_mutation: bool = False
    anystr_strip_whitespace: bool = True
    min_anystr_length: int = 1

EnrichEventData

Bases: BaseModel

Encapsulates the data needed for an enrich-event job.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class EnrichEventData(BaseModel):
    """
    Encapsulates the data needed for an enrich-event job.
    """

    class Config:
        """
        Pydantic configuration.
        """

        allow_mutation: bool = False
        anystr_strip_whitespace: bool = True
        min_anystr_length: int = 1

    event_id: int
    """The ID of the event to enrich."""
    enrichment_plugins: list[str]
    """The list of enrichment plugins to use for enrichment"""

enrichment_plugins: list[str] instance-attribute

The list of enrichment plugins to use for enrichment

event_id: int instance-attribute

The ID of the event to enrich.

Config

Pydantic configuration.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
29
30
31
32
33
34
35
36
class Config:
    """
    Pydantic configuration.
    """

    allow_mutation: bool = False
    anystr_strip_whitespace: bool = True
    min_anystr_length: int = 1

EnrichEventResult

Bases: BaseModel

Encapsulates the result of an enrich-event job.

Contains the number of created attributes.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
44
45
46
47
48
49
50
51
52
class EnrichEventResult(BaseModel):
    """
    Encapsulates the result of an enrich-event job.

    Contains the number of created attributes.
    """

    created_attributes: NonNegativeInt = 0
    """The number of created attributes."""

created_attributes: NonNegativeInt = 0 class-attribute instance-attribute

The number of created attributes.

enrich_attribute(misp_attribute, enrichment_plugins)

Enriches the given event attribute with the specified plugins and returns the created attributes and tags.

:param misp_attribute: The attribute to enrich. :type misp_attribute: AttributeWithTagRelationship :param enrichment_plugins: The plugins to use for enriching the attribute. :type enrichment_plugins: list[str] :return: The created Attributes and Tags. :rtype: EnrichAttributeData

Source code in src/mmisp/worker/jobs/enrichment/enrich_attribute_job.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def enrich_attribute(
    misp_attribute: AttributeWithTagRelationship, enrichment_plugins: list[str]
) -> EnrichAttributeResult:
    """
    Enriches the given event attribute with the specified plugins and returns the created attributes and tags.

    :param misp_attribute: The attribute to enrich.
    :type misp_attribute: AttributeWithTagRelationship
    :param enrichment_plugins: The plugins to use for enriching the attribute.
    :type enrichment_plugins: list[str]
    :return: The created Attributes and Tags.
    :rtype: EnrichAttributeData
    """

    result: EnrichAttributeResult = EnrichAttributeResult()
    for plugin_name in enrichment_plugins:
        if enrichment_plugin_factory.is_plugin_registered(plugin_name):
            # Skip Plugins that are not compatible with the attribute.
            plugin_io: PluginIO = enrichment_plugin_factory.get_plugin_io(plugin_name)
            if misp_attribute.type not in plugin_io.INPUT:
                _logger.error(
                    f"Plugin {plugin_name} is not compatible with attribute type {misp_attribute.type}. "
                    f"Plugin execution will be skipped."
                )
                continue

            # Instantiate Plugin
            plugin: EnrichmentPlugin
            try:
                plugin = enrichment_plugin_factory.create(plugin_name, misp_attribute)
            except NotAValidPlugin as exception:
                _logger.exception(exception)
                continue

            # Execute Plugin and save result
            plugin_result: EnrichAttributeResult
            try:
                plugin_result = plugin.run()
            except Exception as exception:
                _logger.exception(f"Execution of plugin '{plugin_name}' failed. {exception}")
                continue

            if plugin_result:
                result.append(plugin_result)

        else:
            _logger.error(f"Plugin '{plugin_name}' is not registered. Cannot be used for enrichment.")

    return result

enrich_attribute_job(user_data, data)

Provides an implementation of the enrich-attribute job.

Takes a Misp event-attribute as input and runs specified plugins to enrich the attribute.

:param user_data: The user who created the job. (not used) :type user_data: UserData :param data: The data needed for the enrichment process. :type data: EnrichAttributeData :return: The created Attributes and Tags. :rtype: EnrichAttributeResult

Source code in src/mmisp/worker/jobs/enrichment/enrich_attribute_job.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@celery_app.task
def enrich_attribute_job(user_data: UserData, data: EnrichAttributeData) -> EnrichAttributeResult:
    """
    Provides an implementation of the enrich-attribute job.

    Takes a Misp event-attribute as input and runs specified plugins to enrich the attribute.

    :param user_data: The user who created the job. (not used)
    :type user_data: UserData
    :param data: The data needed for the enrichment process.
    :type data: EnrichAttributeData
    :return: The created Attributes and Tags.
    :rtype: EnrichAttributeResult
    """
    return asyncio.run(_enrich_attribute_job(user_data, data))

ENV_ENRICHMENT_PLUGIN_DIRECTORY = f'{ENV_PREFIX}_ENRICHMENT_PLUGIN_DIRECTORY' module-attribute

The name of the environment variable that configures the directory where enrichment plugins are loaded from.

EnrichmentConfigData

Bases: ConfigData

Encapsulates configuration for the enrichment worker and its jobs.

Source code in src/mmisp/worker/jobs/enrichment/enrichment_config_data.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class EnrichmentConfigData(ConfigData):
    """
    Encapsulates configuration for the enrichment worker and its jobs.
    """

    class Config:
        """
        Pydantic configuration.
        """

        validate_assignment: bool = True

    plugin_directory: str = _PLUGIN_DEFAULT_DIRECTORY
    """The directory where the plugins are stored."""

    @validator("plugin_directory")
    @classmethod
    def validate_plugin_module(cls: Type["EnrichmentConfigData"], value: str) -> str:
        """
        Validates the plugin_directory.
        If the module is not valid or could not be found a default value is assigned.
        :param value: The plugin_directory value.
        :type value: str
        :return: The given or a default plugin directory.
        """

        plugin_module: str = value.strip()

        if plugin_module:
            if os.path.isdir(plugin_module):
                return plugin_module
            else:
                _log.error(f"The given plugin directory {plugin_module} for enrichment plugins does not exist.")

        return _PLUGIN_DEFAULT_DIRECTORY

    def read_config_from_env(self: Self) -> None:
        """
        Reads the configuration of the enrichment worker from environment variables.
        """
        env = os.environ.get(ENV_ENRICHMENT_PLUGIN_DIRECTORY)
        if env:
            plugin_directory: str = env
            self.plugin_directory = plugin_directory

plugin_directory: str = _PLUGIN_DEFAULT_DIRECTORY class-attribute instance-attribute

The directory where the plugins are stored.

Config

Pydantic configuration.

Source code in src/mmisp/worker/jobs/enrichment/enrichment_config_data.py
23
24
25
26
27
28
class Config:
    """
    Pydantic configuration.
    """

    validate_assignment: bool = True

read_config_from_env()

Reads the configuration of the enrichment worker from environment variables.

Source code in src/mmisp/worker/jobs/enrichment/enrichment_config_data.py
54
55
56
57
58
59
60
61
def read_config_from_env(self: Self) -> None:
    """
    Reads the configuration of the enrichment worker from environment variables.
    """
    env = os.environ.get(ENV_ENRICHMENT_PLUGIN_DIRECTORY)
    if env:
        plugin_directory: str = env
        self.plugin_directory = plugin_directory

validate_plugin_module(value) classmethod

Validates the plugin_directory. If the module is not valid or could not be found a default value is assigned. :param value: The plugin_directory value. :type value: str :return: The given or a default plugin directory.

Source code in src/mmisp/worker/jobs/enrichment/enrichment_config_data.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@validator("plugin_directory")
@classmethod
def validate_plugin_module(cls: Type["EnrichmentConfigData"], value: str) -> str:
    """
    Validates the plugin_directory.
    If the module is not valid or could not be found a default value is assigned.
    :param value: The plugin_directory value.
    :type value: str
    :return: The given or a default plugin directory.
    """

    plugin_module: str = value.strip()

    if plugin_module:
        if os.path.isdir(plugin_module):
            return plugin_module
        else:
            _log.error(f"The given plugin directory {plugin_module} for enrichment plugins does not exist.")

    return _PLUGIN_DEFAULT_DIRECTORY

EnrichmentPlugin

Bases: Plugin

Interface for an enrichment plugin.

Provides functionality for enriching a given MISP Event-Attribute. Creates and returns new attributes and tags.

Source code in src/mmisp/worker/jobs/enrichment/plugins/enrichment_plugin.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class EnrichmentPlugin(Plugin):
    """
    Interface for an enrichment plugin.

    Provides functionality for enriching a given MISP Event-Attribute.
    Creates and returns new attributes and tags.
    """

    PLUGIN_INFO: EnrichmentPluginInfo
    """Information about the plugin."""

    @abc.abstractmethod
    def __init__(self: Self, misp_attribute: AttributeWithTagRelationship) -> None:
        """
        Creates a new enrichment plugin initialized with an event attribute.

        :param misp_attribute: The MISP Event-Attribute to enrich.
        :type misp_attribute: AttributeWithTagRelationship
        """
        ...

    @abc.abstractmethod
    def run(self: Self) -> EnrichAttributeResult:
        """
        Entry point for the plugin. Starts enrichment process and returns created attributes and tags.
        :return: The created (enriched) attributes and tags.
        :rtype: EnrichAttributeResult
        """
        ...

PLUGIN_INFO: EnrichmentPluginInfo instance-attribute

Information about the plugin.

__init__(misp_attribute) abstractmethod

Creates a new enrichment plugin initialized with an event attribute.

:param misp_attribute: The MISP Event-Attribute to enrich. :type misp_attribute: AttributeWithTagRelationship

Source code in src/mmisp/worker/jobs/enrichment/plugins/enrichment_plugin.py
21
22
23
24
25
26
27
28
29
@abc.abstractmethod
def __init__(self: Self, misp_attribute: AttributeWithTagRelationship) -> None:
    """
    Creates a new enrichment plugin initialized with an event attribute.

    :param misp_attribute: The MISP Event-Attribute to enrich.
    :type misp_attribute: AttributeWithTagRelationship
    """
    ...

run() abstractmethod

Entry point for the plugin. Starts enrichment process and returns created attributes and tags. :return: The created (enriched) attributes and tags. :rtype: EnrichAttributeResult

Source code in src/mmisp/worker/jobs/enrichment/plugins/enrichment_plugin.py
31
32
33
34
35
36
37
38
@abc.abstractmethod
def run(self: Self) -> EnrichAttributeResult:
    """
    Entry point for the plugin. Starts enrichment process and returns created attributes and tags.
    :return: The created (enriched) attributes and tags.
    :rtype: EnrichAttributeResult
    """
    ...

EnrichmentPluginFactory

Bases: PluginFactory[EnrichmentPlugin, EnrichmentPluginInfo]

Encapsulates a factory specifically for Enrichment Plugins.

Source code in src/mmisp/worker/jobs/enrichment/plugins/enrichment_plugin_factory.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class EnrichmentPluginFactory(PluginFactory[EnrichmentPlugin, EnrichmentPluginInfo]):
    """
    Encapsulates a factory specifically for Enrichment Plugins.
    """

    def create(self: Self, plugin_name: str, misp_attribute: AttributeWithTagRelationship) -> EnrichmentPlugin:
        """
        Creates an instance of a given plugin initialized with the specified event attribute.

        :param plugin_name: The name of the plugin.
        :type plugin_name: str
        :param misp_attribute: The MISP-Attribute to enrich.
        :type misp_attribute: AttributeWithTagRelationship
        :return: The instantiated enrichment plugin.
        :rtype: EnrichmentPlugin
        :raises PluginNotFound: If there is no plugin with the specified name.
        :raises NotAValidPlugin: If the constructor of the plugin does not match the interface.
        """

        if not self.is_plugin_registered(plugin_name):
            raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'. Cannot be instantiated.")

        plugin_instance: EnrichmentPlugin
        try:
            plugin_instance = self._plugins[plugin_name](misp_attribute)
        except TypeError as type_error:
            raise NotAValidPlugin(message=f"Plugin '{plugin_name}' has incorrect constructor: {type_error}")
        except Exception as exception:
            raise NotAValidPlugin(message=f"Plugin '{plugin_name}' could not be instantiated: {exception}")

        return plugin_instance

    def get_plugin_io(self: Self, plugin_name: str) -> PluginIO:
        """
        Returns information about the accepted and returned attribute types of a given enrichment plugin.
        :param plugin_name: The name of the plugin.
        :type plugin_name: str
        :return: The accepted and returned types of attributes.
        :rtype: PluginIO
        """

        if not self.is_plugin_registered(plugin_name):
            raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'.")

        return self.get_plugin_info(plugin_name).MISP_ATTRIBUTES

create(plugin_name, misp_attribute)

Creates an instance of a given plugin initialized with the specified event attribute.

:param plugin_name: The name of the plugin. :type plugin_name: str :param misp_attribute: The MISP-Attribute to enrich. :type misp_attribute: AttributeWithTagRelationship :return: The instantiated enrichment plugin. :rtype: EnrichmentPlugin :raises PluginNotFound: If there is no plugin with the specified name. :raises NotAValidPlugin: If the constructor of the plugin does not match the interface.

Source code in src/mmisp/worker/jobs/enrichment/plugins/enrichment_plugin_factory.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def create(self: Self, plugin_name: str, misp_attribute: AttributeWithTagRelationship) -> EnrichmentPlugin:
    """
    Creates an instance of a given plugin initialized with the specified event attribute.

    :param plugin_name: The name of the plugin.
    :type plugin_name: str
    :param misp_attribute: The MISP-Attribute to enrich.
    :type misp_attribute: AttributeWithTagRelationship
    :return: The instantiated enrichment plugin.
    :rtype: EnrichmentPlugin
    :raises PluginNotFound: If there is no plugin with the specified name.
    :raises NotAValidPlugin: If the constructor of the plugin does not match the interface.
    """

    if not self.is_plugin_registered(plugin_name):
        raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'. Cannot be instantiated.")

    plugin_instance: EnrichmentPlugin
    try:
        plugin_instance = self._plugins[plugin_name](misp_attribute)
    except TypeError as type_error:
        raise NotAValidPlugin(message=f"Plugin '{plugin_name}' has incorrect constructor: {type_error}")
    except Exception as exception:
        raise NotAValidPlugin(message=f"Plugin '{plugin_name}' could not be instantiated: {exception}")

    return plugin_instance

get_plugin_io(plugin_name)

Returns information about the accepted and returned attribute types of a given enrichment plugin. :param plugin_name: The name of the plugin. :type plugin_name: str :return: The accepted and returned types of attributes. :rtype: PluginIO

Source code in src/mmisp/worker/jobs/enrichment/plugins/enrichment_plugin_factory.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_plugin_io(self: Self, plugin_name: str) -> PluginIO:
    """
    Returns information about the accepted and returned attribute types of a given enrichment plugin.
    :param plugin_name: The name of the plugin.
    :type plugin_name: str
    :return: The accepted and returned types of attributes.
    :rtype: PluginIO
    """

    if not self.is_plugin_registered(plugin_name):
        raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'.")

    return self.get_plugin_info(plugin_name).MISP_ATTRIBUTES

enrich_event_job(user_data, data)

Encapsulates a Job enriching a given MISP Event.

Job fetches MISP Attributes from a given Event and executes the specified enrichment plugins for each of these attributes. Newly created Attributes and Tags are attached to the Event in the MISP-Database.

:param user_data: The user who created the job. (not used) :type user_data: UserData :param data: The event id and enrichment plugins. :return: The number of newly created attributes. :rtype: EnrichEventResult

Source code in src/mmisp/worker/jobs/enrichment/enrich_event_job.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@celery_app.task
def enrich_event_job(user_data: UserData, data: EnrichEventData) -> EnrichEventResult:
    """
    Encapsulates a Job enriching a given MISP Event.

    Job fetches MISP Attributes from a given Event and executes the specified enrichment plugins
    for each of these attributes.
    Newly created Attributes and Tags are attached to the Event in the MISP-Database.

    :param user_data: The user who created the job. (not used)
    :type user_data: UserData
    :param data: The event id and enrichment plugins.
    :return: The number of newly created attributes.
    :rtype: EnrichEventResult
    """
    return asyncio.run(_enrich_event_job(user_data, data))

__regenerate_correlation_values(session, misp_api, correlation_threshold) async

Method to regenerate the amount of correlations for the values with correlations. :return: if the database was changed :rtype: bool

Source code in src/mmisp/worker/jobs/correlation/regenerate_occurrences_job.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def __regenerate_correlation_values(session: AsyncSession, misp_api: MispAPI, correlation_threshold: int) -> bool:
    """
    Method to regenerate the amount of correlations for the values with correlations.
    :return: if the database was changed
    :rtype: bool
    """
    changed: bool = False
    correlation_values: list[str] = await get_values_with_correlation(session)
    for value in correlation_values:
        count_correlations: int = await get_number_of_correlations(session, value, False)
        current_attributes: list[Attribute] = await get_attributes_with_same_value(session, value)
        count_possible_correlations: int = get_amount_of_possible_correlations(current_attributes)
        count_attributes: int = len(current_attributes)
        if count_attributes > correlation_threshold:
            await delete_correlations(session, value)
            await add_over_correlating_value(session, value, count_attributes)
            changed = True
        elif count_possible_correlations != count_correlations:
            await delete_correlations(session, value)
            await correlate_value(session, misp_api, correlation_threshold, value)
            changed = True
        elif count_possible_correlations == count_correlations == 0:
            await delete_correlations(session, value)
            changed = True
    return changed

__regenerate_over_correlating(session, misp_api, correlation_threshold) async

Method to regenerate the amount of correlations for the over correlating values. :return: if the database was changed :rtype: bool

Source code in src/mmisp/worker/jobs/correlation/regenerate_occurrences_job.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def __regenerate_over_correlating(session: AsyncSession, misp_api: MispAPI, correlation_threshold: int) -> bool:
    """
    Method to regenerate the amount of correlations for the over correlating values.
    :return: if the database was changed
    :rtype: bool
    """
    changed: bool = False
    over_correlating_values: list[tuple[str, int]] = await get_over_correlating_values(session)
    for entry in over_correlating_values:
        value: str = entry[0]
        count: int = entry[1]

        current_attributes: list[Attribute] = await get_attributes_with_same_value(session, value)
        count_attributes: int = len(current_attributes)

        if count_attributes != count and count_attributes > correlation_threshold:
            await add_over_correlating_value(session, value, count_attributes)
            changed = True
        elif count_attributes <= correlation_threshold:
            await delete_over_correlating_value(session, value)
            await correlate_value(session, misp_api, correlation_threshold, value)
            changed = True
    return changed

regenerate_occurrences_job(user)

Method to regenerate the occurrences of the correlations in the database. Over correlating values and values with correlations are checked. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse

Source code in src/mmisp/worker/jobs/correlation/regenerate_occurrences_job.py
24
25
26
27
28
29
30
31
32
33
34
@celery_app.task
def regenerate_occurrences_job(user: UserData) -> DatabaseChangedResponse:
    """
    Method to regenerate the occurrences of the correlations in the database.
    Over correlating values and values with correlations are checked.
    :param user: the user who requested the job
    :type user: UserData
    :return: if the job was successful and if the database was changed
    :rtype: DatabaseChangedResponse
    """
    return asyncio.run(_regenerate_occurrences_job(user))

clean_excluded_correlations_job(user)

Task to clean the excluded correlations from the correlations of the MISP database. For every excluded value the correlations are removed. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse

Source code in src/mmisp/worker/jobs/correlation/clean_excluded_correlations_job.py
10
11
12
13
14
15
16
17
18
19
20
21
@celery_app.task
def clean_excluded_correlations_job(user: UserData) -> DatabaseChangedResponse:
    """
    Task to clean the excluded correlations from the correlations of the MISP database.
    For every excluded value the correlations are removed.
    :param user: the user who requested the job
    :type user: UserData
    :return: if the job was successful and if the database was changed
    :rtype: DatabaseChangedResponse
    """

    return asyncio.run(_clean_excluded_correlations_job(user))

correlate_value(session, misp_api, correlation_threshold, value) async

Static method to correlate the given value based on the misp_sql database and misp_api interface. :param value: to correlate :param value: string :return: relevant information about the correlation :rtype: CorrelateValueResponse

Source code in src/mmisp/worker/jobs/correlation/correlate_value_job.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def correlate_value(
    session: AsyncSession, misp_api: MispAPI, correlation_threshold: int, value: str
) -> CorrelateValueResponse:
    """
    Static method to correlate the given value based on the misp_sql database and misp_api interface.
    :param value: to correlate
    :param value: string
    :return: relevant information about the correlation
    :rtype: CorrelateValueResponse
    """
    if await misp_sql.is_excluded_correlation(session, value):
        return CorrelateValueResponse(
            success=True,
            found_correlations=False,
            is_excluded_value=True,
            is_over_correlating_value=False,
            plugin_name=None,
            events=None,
        )
    attributes: list[Attribute] = await misp_sql.get_attributes_with_same_value(session, value)
    count: int = len(attributes)
    if count > correlation_threshold:
        await misp_sql.delete_correlations(session, value)
        await misp_sql.add_over_correlating_value(session, value, count)
        return CorrelateValueResponse(
            success=True,
            found_correlations=True,
            is_excluded_value=False,
            is_over_correlating_value=True,
            plugin_name=None,
            events=None,
        )
    elif count > 1:
        uuid_events: set[UUID] = await save_correlations(session, misp_api, attributes, value)
        return CorrelateValueResponse(
            success=True,
            found_correlations=(len(uuid_events) > 1),
            is_excluded_value=False,
            is_over_correlating_value=False,
            plugin_name=None,
            events=uuid_events,
        )
    else:
        return CorrelateValueResponse(
            success=True,
            found_correlations=False,
            is_excluded_value=False,
            is_over_correlating_value=False,
            plugin_name=None,
            events=None,
        )

correlate_value_job(user, correlate_value_data)

Method to execute the job. In CorrelateValueData is the value to correlate.

:param user: the user who requested the job :type user: UserData :param correlate_value_data: value to correlate :type correlate_value_data: CorrelateValue :return: relevant information about the correlation :rtype: CorrelateValueResponse

Source code in src/mmisp/worker/jobs/correlation/correlate_value_job.py
16
17
18
19
20
21
22
23
24
25
26
27
28
@celery_app.task
def correlate_value_job(user: UserData, correlate_value_data: CorrelateValueData) -> CorrelateValueResponse:
    """
    Method to execute the job. In CorrelateValueData is the value to correlate.

    :param user: the user who requested the job
    :type user: UserData
    :param correlate_value_data: value to correlate
    :type correlate_value_data: CorrelateValue
    :return: relevant information about the correlation
    :rtype: CorrelateValueResponse
    """
    return asyncio.run(_correlate_value_job(user, correlate_value_data))

__process_result(session, misp_api, plugin_name, value, result) async

Processes the result of the plugin. :param result: the result of the plugin :type result: InternPluginResult :return: a response with the result of the plugin :rtype: CorrelateValueResponse :raises: PluginExecutionException: If the result of the plugin is invalid.

Source code in src/mmisp/worker/jobs/correlation/correlation_plugin_job.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def __process_result(
    session: AsyncSession, misp_api: MispAPI, plugin_name: str, value: str, result: InternPluginResult | None
) -> CorrelateValueResponse:
    """
    Processes the result of the plugin.
    :param result: the result of the plugin
    :type result: InternPluginResult
    :return: a response with the result of the plugin
    :rtype: CorrelateValueResponse
    :raises: PluginExecutionException: If the result of the plugin is invalid.
    """
    if result is None:
        raise PluginExecutionException(message="The result of the plugin was None.")
    response: CorrelateValueResponse = CorrelateValueResponse(
        success=result.success,
        found_correlations=result.found_correlations,
        is_excluded_value=False,
        is_over_correlating_value=result.is_over_correlating_value,
        plugin_name=plugin_name,
    )
    if result.found_correlations and len(result.correlations) > 1:
        uuid_events: set[UUID] = await save_correlations(session, misp_api, result.correlations, value)
        response.events = uuid_events
    elif len(result.correlations) <= 1:
        response.found_correlations = False
    return response

correlation_plugin_job(user, data)

Method to execute a correlation plugin job. It creates a plugin based on the given data and runs it. Finally, it processes the result and returns a response.

:param user: the user who requested the job :type user: UserData :param data: specifies the value and the plugin to use :type data: CorrelationPluginJobData :return: a response with the result of the correlation by the plugin :rtype: CorrelateValueResponse

Source code in src/mmisp/worker/jobs/correlation/correlation_plugin_job.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@celery_app.task
def correlation_plugin_job(user: UserData, data: CorrelationPluginJobData) -> CorrelateValueResponse:
    """
    Method to execute a correlation plugin job.
    It creates a plugin based on the given data and runs it.
    Finally, it processes the result and returns a response.

    :param user: the user who requested the job
    :type user: UserData
    :param data: specifies the value and the plugin to use
    :type data: CorrelationPluginJobData
    :return: a response with the result of the correlation by the plugin
    :rtype: CorrelateValueResponse
    """
    return asyncio.run(_correlation_plugin_job(user, data))

CorrelationPluginInfo

Bases: PluginInfo

Class to hold information about a correlation plugin.

Source code in src/mmisp/worker/jobs/correlation/plugins/correlation_plugin_info.py
16
17
18
19
20
21
class CorrelationPluginInfo(PluginInfo):
    """
    Class to hold information about a correlation plugin.
    """

    CORRELATION_TYPE: CorrelationPluginType

CorrelationPluginType

Bases: str, Enum

Enum for the type of correlation plugin.

Source code in src/mmisp/worker/jobs/correlation/plugins/correlation_plugin_info.py
 6
 7
 8
 9
10
11
12
13
class CorrelationPluginType(str, Enum):
    """
    Enum for the type of correlation plugin.
    """

    ALL_CORRELATIONS = "all"
    SELECTED_CORRELATIONS = "selected"
    OTHER = "other"

CorrelationPlugin

Bases: Plugin

Class to be implemented by correlation plugins. It provides the basic functionality to run a correlation plugin.

Source code in src/mmisp/worker/jobs/correlation/plugins/correlation_plugin.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class CorrelationPlugin(Plugin):
    """
    Class to be implemented by correlation plugins. It provides the basic functionality to run a correlation plugin.
    """

    PLUGIN_INFO: CorrelationPluginInfo = Field(..., allow_mutation=False)

    async def run(self: Self) -> InternPluginResult | None:
        """
        Runs the plugin. To be implemented by the plugin.
        :return: the result of the plugin
        :rtype: InternPluginResult
        :raises: PluginExecutionException: If the plugin is executed but an error occurs.
        """
        pass

    def __init__(self: Self, value: str, misp_api: MispAPI, threshold: int) -> None:
        self.value: str = value
        self.misp_api: MispAPI = misp_api
        self.threshold: int = threshold

run() async

Runs the plugin. To be implemented by the plugin. :return: the result of the plugin :rtype: InternPluginResult :raises: PluginExecutionException: If the plugin is executed but an error occurs.

Source code in src/mmisp/worker/jobs/correlation/plugins/correlation_plugin.py
18
19
20
21
22
23
24
25
async def run(self: Self) -> InternPluginResult | None:
    """
    Runs the plugin. To be implemented by the plugin.
    :return: the result of the plugin
    :rtype: InternPluginResult
    :raises: PluginExecutionException: If the plugin is executed but an error occurs.
    """
    pass

correlation_plugin_factory = CorrelationPluginFactory() module-attribute

The factory to create correlation plugins for the whole application.

CorrelationPluginFactory

Bases: PluginFactory[CorrelationPlugin, CorrelationPluginInfo]

The factory to register and create correlation plugins.

Source code in src/mmisp/worker/jobs/correlation/plugins/correlation_plugin_factory.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class CorrelationPluginFactory(PluginFactory[CorrelationPlugin, CorrelationPluginInfo]):
    """
    The factory to register and create correlation plugins.
    """

    def create(self: Self, plugin_name: str, misp_value: str, misp_api: MispAPI, threshold: int) -> CorrelationPlugin:
        """
        Create an instance of a plugin.

        :param threshold: the current correlation threshold
        :type threshold: int
        :param misp_api: the misp api for the plugin to use
        :type misp_api: MispAPI
        :param plugin_name: The name of the plugin.
        :type plugin_name: str
        :param misp_value: The value to correlate.
        :type misp_value: str
        :return: The instantiated correlation plugin, initialized with the value.
        """
        if not self.is_plugin_registered(plugin_name):
            raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'. Cannot be instantiated.")

        plugin_instance: CorrelationPlugin
        try:
            plugin_instance = self._plugins[plugin_name](misp_value, misp_api, threshold)
        except TypeError as type_error:
            raise NotAValidPlugin(message=f"Plugin '{plugin_name}' has incorrect constructor: {type_error}")

        return plugin_instance

create(plugin_name, misp_value, misp_api, threshold)

Create an instance of a plugin.

:param threshold: the current correlation threshold :type threshold: int :param misp_api: the misp api for the plugin to use :type misp_api: MispAPI :param plugin_name: The name of the plugin. :type plugin_name: str :param misp_value: The value to correlate. :type misp_value: str :return: The instantiated correlation plugin, initialized with the value.

Source code in src/mmisp/worker/jobs/correlation/plugins/correlation_plugin_factory.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def create(self: Self, plugin_name: str, misp_value: str, misp_api: MispAPI, threshold: int) -> CorrelationPlugin:
    """
    Create an instance of a plugin.

    :param threshold: the current correlation threshold
    :type threshold: int
    :param misp_api: the misp api for the plugin to use
    :type misp_api: MispAPI
    :param plugin_name: The name of the plugin.
    :type plugin_name: str
    :param misp_value: The value to correlate.
    :type misp_value: str
    :return: The instantiated correlation plugin, initialized with the value.
    """
    if not self.is_plugin_registered(plugin_name):
        raise PluginNotFound(message=f"Unknown plugin '{plugin_name}'. Cannot be instantiated.")

    plugin_instance: CorrelationPlugin
    try:
        plugin_instance = self._plugins[plugin_name](misp_value, misp_api, threshold)
    except TypeError as type_error:
        raise NotAValidPlugin(message=f"Plugin '{plugin_name}' has incorrect constructor: {type_error}")

    return plugin_instance

create_correlations(attributes, events, objects, value_id)

Method to create DefaultCorrelation objects based on the given list of MispEventAttribute und list of AddEditGetEventDetails. For every attribute a correlation is created with any other attribute in the list (except itself). The MispEventAttribute at place i in the list has to be an attribute of the AddEditGetEventDetails at place i in the list of AddEditGetEventDetails to function properly.

:param attributes: list of MispEventAttribute to create correlations from :param events: list of the MispEvents the MispEventAttribute occurs in :param value_id: the id of the value for the correlation :return: a list of DefaultCorrelation

Source code in src/mmisp/worker/jobs/correlation/utility.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def create_correlations(
    attributes: list[Attribute],
    events: list[AddEditGetEventDetails],
    objects: list[ObjectWithAttributesResponse],
    value_id: int,
) -> list[DefaultCorrelation]:
    """
    Method to create DefaultCorrelation objects based on the given list of MispEventAttribute und list of
    AddEditGetEventDetails. For every attribute a correlation is created with any other attribute in the list
    (except itself). The MispEventAttribute at place i in the list has to be an attribute of the AddEditGetEventDetails
    at place i in the list of AddEditGetEventDetails to function properly.

    :param attributes: list of MispEventAttribute to create correlations from
    :param events: list of the MispEvents the MispEventAttribute occurs in
    :param value_id: the id of the value for the correlation
    :return: a list of DefaultCorrelation
    """
    correlations = [
        _create_correlation_from_attributes(a1, e1, o1, a2, e2, o2, value_id)
        for ((a1, e1, o1), (a2, e2, o2)) in combinations(zip(attributes, events, objects), 2)
        if a1.event_id != a2.event_id
    ]

    return correlations

get_amount_of_possible_correlations(attributes)

Method to calculate the amount of possible correlations for the given list of Attribute. The amount of possible correlations is the amount of attributes minus the amount of attributes which are in the same event. :param attributes: the attributes to calculate the amount of possible correlations for :type attributes: list[Attribute] :return: the amount of possible correlations :rtype: int

Source code in src/mmisp/worker/jobs/correlation/utility.py
121
122
123
124
125
126
127
128
129
130
131
def get_amount_of_possible_correlations(attributes: list[Attribute]) -> int:
    """
    Method to calculate the amount of possible correlations for the given list of Attribute.
    The amount of possible correlations is the amount of attributes minus the amount of attributes which are in the same
    event.
    :param attributes: the attributes to calculate the amount of possible correlations for
    :type attributes: list[Attribute]
    :return: the amount of possible correlations
    :rtype: int
    """
    return sum(1 for a1, a2 in combinations(attributes, 2) if a1.event_id != a2.event_id)

save_correlations(db, misp_api, attributes, value) async

Method to generate DefaultCorrelation objects from the given list of MispEventAttribute and save them in the database. All MispEventAttribute in the list have to be attributes which have the same value and are correlated with each other. :param attributes: the attributes to correlate with each other :type attributes: list[Attribute] :param value: on which the correlations are based :type value: str :return: a set of UUIDs representing the events the correlation are associated with :rtype: set[UUID]

Source code in src/mmisp/worker/jobs/correlation/utility.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
async def save_correlations(db: AsyncSession, misp_api: MispAPI, attributes: list[Attribute], value: str) -> set[UUID]:
    """
    Method to generate DefaultCorrelation objects from the given list of MispEventAttribute and save them in the
    database. All MispEventAttribute in the list have to be attributes which have the same value and are correlated
    with each other.
    :param attributes: the attributes to correlate with each other
    :type attributes: list[Attribute]
    :param value: on which the correlations are based
    :type value: str
    :return: a set of UUIDs representing the events the correlation are associated with
    :rtype: set[UUID]
    """
    value_id: int = await misp_sql.add_correlation_value(db, value)
    events: list[AddEditGetEventDetails] = list()
    objects: list[ObjectWithAttributesResponse] = list()
    for attribute in attributes:
        events.append(await misp_api.get_event(attribute.event_id))
        objects.append(await misp_api.get_object(attribute.object_id))
    correlations = create_correlations(attributes, events, objects, value_id)
    await misp_sql.add_correlations(db, correlations)
    result: list[UUID] = list()
    for event in events:
        result.append(UUID(event.uuid))
    return set(result)

ChangeThresholdData

Bases: BaseModel

Data to change the threshold.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
67
68
69
70
71
72
class ChangeThresholdData(BaseModel):
    """
    Data to change the threshold.
    """

    new_threshold: int

ChangeThresholdResponse

Bases: BaseModel

Response for the change of the threshold.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
40
41
42
43
44
45
46
47
class ChangeThresholdResponse(BaseModel):
    """
    Response for the change of the threshold.
    """

    saved: bool
    valid_threshold: bool
    new_threshold: Optional[int] = None

CorrelateValueData

Bases: BaseModel

Data for the correlation of a value.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
59
60
61
62
63
64
class CorrelateValueData(BaseModel):
    """
    Data for the correlation of a value.
    """

    value: str

CorrelateValueResponse

Bases: BaseModel

Response for the correlation of a value.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
 9
10
11
12
13
14
15
16
17
18
19
class CorrelateValueResponse(BaseModel):
    """
    Response for the correlation of a value.
    """

    success: bool
    found_correlations: bool
    is_excluded_value: bool
    is_over_correlating_value: bool
    plugin_name: Optional[str] = None
    events: Optional[set[UUID]] = None

CorrelationPluginJobData

Bases: BaseModel

Data for a correlation plugin job.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
50
51
52
53
54
55
56
class CorrelationPluginJobData(BaseModel):
    """
    Data for a correlation plugin job.
    """

    value: str
    correlation_plugin_name: str

DatabaseChangedResponse

Bases: BaseModel

Response for jobs that only change the database.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
31
32
33
34
35
36
37
class DatabaseChangedResponse(BaseModel):
    """
    Response for jobs that only change the database.
    """

    success: bool
    database_changed: bool

InternPluginResult

Bases: BaseModel

Result of a plugin to process by the job.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
75
76
77
78
79
80
81
82
83
84
85
86
class InternPluginResult(BaseModel):
    """
    Result of a plugin to process by the job.
    """

    success: bool
    found_correlations: bool
    is_over_correlating_value: bool
    correlations: list[Attribute]

    class Config:
        arbitrary_types_allowed = True

TopCorrelationsResponse

Bases: BaseModel

Response for the top correlations job.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
22
23
24
25
26
27
28
class TopCorrelationsResponse(BaseModel):
    """
    Response for the top correlations job.
    """

    success: bool
    top_correlations: list[tuple[str, int]]

ENV_CORRELATION_PLUGIN_DIRECTORY = f'{ENV_PREFIX}_CORRELATION_PLUGIN_DIRECTORY' module-attribute

The name of the environment variable that configures the directory where correlation plugins are loaded from.

PLUGIN_DEFAULT_DIRECTORY: str = '' module-attribute

The default package used for correlation plugins.

CorrelationConfigData

Bases: ConfigData

Encapsulates configuration for the correlation worker and its jobs.

Source code in src/mmisp/worker/jobs/correlation/correlation_config_data.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class CorrelationConfigData(ConfigData):
    """
    Encapsulates configuration for the correlation worker and its jobs.
    """

    class Config:
        """
        Pydantic configuration.
        """

        validate_assignment: bool = True

    plugin_directory: str = PLUGIN_DEFAULT_DIRECTORY
    """The directory where the plugins are stored."""

    @validator("plugin_directory")
    @classmethod
    def validate_plugin_module(cls: Type["CorrelationConfigData"], value: str) -> str:
        """
        Validates the plugin_directory.
        If the module is not valid or could not be found a default value is assigned.
        :param value: The plugin_directory value.
        :type value: str
        :return: The given or a default plugin directory.
        """

        plugin_module: str = value.strip()

        if plugin_module:
            if os.path.isdir(plugin_module):
                return plugin_module
            else:
                _log.error(f"The given plugin directory '{plugin_module}' for correlation plugins does not exist.")

        return PLUGIN_DEFAULT_DIRECTORY

    def read_config_from_env(self: Self) -> None:
        """
        Reads the configuration of the correlation worker from environment variables.
        """
        env_plugin = os.environ.get(ENV_CORRELATION_PLUGIN_DIRECTORY)
        if env_plugin:
            self.plugin_directory = env_plugin

plugin_directory: str = PLUGIN_DEFAULT_DIRECTORY class-attribute instance-attribute

The directory where the plugins are stored.

Config

Pydantic configuration.

Source code in src/mmisp/worker/jobs/correlation/correlation_config_data.py
23
24
25
26
27
28
class Config:
    """
    Pydantic configuration.
    """

    validate_assignment: bool = True

read_config_from_env()

Reads the configuration of the correlation worker from environment variables.

Source code in src/mmisp/worker/jobs/correlation/correlation_config_data.py
54
55
56
57
58
59
60
def read_config_from_env(self: Self) -> None:
    """
    Reads the configuration of the correlation worker from environment variables.
    """
    env_plugin = os.environ.get(ENV_CORRELATION_PLUGIN_DIRECTORY)
    if env_plugin:
        self.plugin_directory = env_plugin

validate_plugin_module(value) classmethod

Validates the plugin_directory. If the module is not valid or could not be found a default value is assigned. :param value: The plugin_directory value. :type value: str :return: The given or a default plugin directory.

Source code in src/mmisp/worker/jobs/correlation/correlation_config_data.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@validator("plugin_directory")
@classmethod
def validate_plugin_module(cls: Type["CorrelationConfigData"], value: str) -> str:
    """
    Validates the plugin_directory.
    If the module is not valid or could not be found a default value is assigned.
    :param value: The plugin_directory value.
    :type value: str
    :return: The given or a default plugin directory.
    """

    plugin_module: str = value.strip()

    if plugin_module:
        if os.path.isdir(plugin_module):
            return plugin_module
        else:
            _log.error(f"The given plugin directory '{plugin_module}' for correlation plugins does not exist.")

    return PLUGIN_DEFAULT_DIRECTORY

top_correlations_job(user)

Method to get a list of all correlations with their occurrence in the database. The list is sorted decreasing by the occurrence. :param user: the user who requested the job :type user: UserData :return: TopCorrelationsResponse with the list and if the job was successful :rtype: TopCorrelationsResponse

Source code in src/mmisp/worker/jobs/correlation/top_correlations_job.py
10
11
12
13
14
15
16
17
18
19
20
@celery_app.task
def top_correlations_job(user: UserData) -> TopCorrelationsResponse:
    """
    Method to get a list of all correlations with their occurrence in the database.
    The list is sorted decreasing by the occurrence.
    :param user: the user who requested the job
    :type user: UserData
    :return: TopCorrelationsResponse with the list and if the job was successful
    :rtype: TopCorrelationsResponse
    """
    return asyncio.run(_top_correlations_job(user))

alert_email_job(user, data)

prepares an alert email by filling and rendering a template. afterward it will be sent to all specified users. :param user: the user who requested the job :type user: UserData :param data: contains data for the template and the user ids who will receive the emails. :type data: AlertEmailData

Source code in src/mmisp/worker/jobs/email/alert_email_job.py
22
23
24
25
26
27
28
29
30
31
@celery_app.task
def alert_email_job(user: UserData, data: AlertEmailData) -> None:
    """
    prepares an alert email by filling and rendering a template. afterward it will be sent to all specified users.
    :param user: the user who requested the job
    :type user: UserData
    :param data: contains data for the template and the user ids who will receive the emails.
    :type data: AlertEmailData
    """
    return asyncio.run(_alert_email_job(user, data))

contact_email_job(requester, data)

Prepares a contact email by filling and rendering a template. Afterward it will be sent to all specified users. :param requester: is the user who wants to contact the users :type requester: UserData :param data: contains data for the template and the user ids who will receive the emails. :type data: ContactEmailData

Source code in src/mmisp/worker/jobs/email/contact_email_job.py
21
22
23
24
25
26
27
28
29
30
@celery_app.task
def contact_email_job(requester: UserData, data: ContactEmailData) -> None:
    """
    Prepares a contact email by filling and rendering a template. Afterward it will be sent to all specified users.
    :param requester: is the user who wants to contact the users
    :type requester: UserData
    :param data: contains data for the template and the user ids who will receive the emails.
    :type data: ContactEmailData
    """
    asyncio.run(_contact_email_job(requester, data))

posts_email_job(user, data)

Prepares a posts email by filling and rendering a template. Afterward it will be sent to all specified users. :param user: the user who requested the job :type user: UserData :param data: contains data for the template and the user ids who will receive the emails. :type data: PostsEmailData

Source code in src/mmisp/worker/jobs/email/posts_email_job.py
23
24
25
26
27
28
29
30
31
32
@celery_app.task
def posts_email_job(user: UserData, data: PostsEmailData) -> None:
    """
    Prepares a posts email by filling and rendering a template. Afterward it will be sent to all specified users.
    :param user: the user who requested the job
    :type user: UserData
    :param data: contains data for the template and the user ids who will receive the emails.
    :type data: PostsEmailData
    """
    return asyncio.run(_posts_email_job(user, data))

AlertEmailData

Bases: BaseModel

Encapsulates the necessary data to send and create an alert email.

Source code in src/mmisp/worker/jobs/email/job_data.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class AlertEmailData(BaseModel):
    """
    Encapsulates the necessary data to send and create an alert email.
    """

    receiver_ids: list[int]
    """The ids of the receivers"""
    event_id: int
    """The id of the event which triggered the alert"""
    old_publish: str
    """The timestamp of old publishing"""

event_id: int instance-attribute

The id of the event which triggered the alert

old_publish: str instance-attribute

The timestamp of old publishing

receiver_ids: list[int] instance-attribute

The ids of the receivers

ContactEmailData

Bases: BaseModel

Encapsulates the necessary data to send and create a contact email.

Source code in src/mmisp/worker/jobs/email/job_data.py
17
18
19
20
21
22
23
24
25
26
27
class ContactEmailData(BaseModel):
    """
    Encapsulates the necessary data to send and create a contact email.
    """

    event_id: int
    """The id of the event which the user wants to know more about"""
    message: str
    """The custom message of the user"""
    receiver_ids: list[int]
    """The ids of the receivers"""

event_id: int instance-attribute

The id of the event which the user wants to know more about

message: str instance-attribute

The custom message of the user

receiver_ids: list[int] instance-attribute

The ids of the receivers

PostsEmailData

Bases: BaseModel

Encapsulates the necessary data to send and create a posts email.

Source code in src/mmisp/worker/jobs/email/job_data.py
30
31
32
33
34
35
36
37
38
39
40
41
42
class PostsEmailData(BaseModel):
    """
    Encapsulates the necessary data to send and create a posts email.
    """

    post_id: int
    """The id of the post where something new was posted"""
    title: str
    """The title of the post"""
    message: str
    """The message which was posted at the post"""
    receiver_ids: list[int]
    """The ids of the receivers"""

message: str instance-attribute

The message which was posted at the post

post_id: int instance-attribute

The id of the post where something new was posted

receiver_ids: list[int] instance-attribute

The ids of the receivers

title: str instance-attribute

The title of the post

SmtpClient

Provides methods to build an SMTP connection to send emails.

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class SmtpClient:
    """
    Provides methods to build an SMTP connection to send emails.
    """

    def __init__(self: Self, host: str, port: int) -> None:
        """
        Initializes a new SMTP object.
        :param port: is the port of the SMTP server
        :type port: int
        :param host: is the host of the SMTP server
        :type host: str
        """
        self.__smtp: SMTP = smtplib.SMTP(host, port)

    def open_smtp_connection(self: Self, email: str, password: str) -> None:
        """
        Connects to the SMTP server and logs in with the misp email.
        If no password is given, the connection will be established without a password.
        :param email: is the email of misp
        :type email: str
        :param password: is the password of the email
        :type password: str
        """
        self.__smtp.ehlo()
        self.__smtp.starttls()
        self.__smtp.ehlo()
        self.__smtp.login(user=email, password=password)

    def send_email(self: Self, from_addr: str, to_addr: str, email: str) -> None:
        """
        Sends an email.
        :param from_addr: is the address of the sender (misp email9
        :type from_addr: str
        :param to_addr: is the address of the receiver (user)
        :type to_addr: str
        :param email: is the content of the email
        :type email: str
        """
        try:
            self.__smtp.sendmail(from_addr, to_addr, email)
        except smtplib.SMTPRecipientsRefused as e:
            log.warning(f"Email to {to_addr} was refused: {e}")

    def close_smtp_connection(self: Self) -> None:
        """
        Closes the SMTP Connection.
        """
        self.__smtp.close()

__init__(host, port)

Initializes a new SMTP object. :param port: is the port of the SMTP server :type port: int :param host: is the host of the SMTP server :type host: str

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
14
15
16
17
18
19
20
21
22
def __init__(self: Self, host: str, port: int) -> None:
    """
    Initializes a new SMTP object.
    :param port: is the port of the SMTP server
    :type port: int
    :param host: is the host of the SMTP server
    :type host: str
    """
    self.__smtp: SMTP = smtplib.SMTP(host, port)

close_smtp_connection()

Closes the SMTP Connection.

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
53
54
55
56
57
def close_smtp_connection(self: Self) -> None:
    """
    Closes the SMTP Connection.
    """
    self.__smtp.close()

open_smtp_connection(email, password)

Connects to the SMTP server and logs in with the misp email. If no password is given, the connection will be established without a password. :param email: is the email of misp :type email: str :param password: is the password of the email :type password: str

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
24
25
26
27
28
29
30
31
32
33
34
35
36
def open_smtp_connection(self: Self, email: str, password: str) -> None:
    """
    Connects to the SMTP server and logs in with the misp email.
    If no password is given, the connection will be established without a password.
    :param email: is the email of misp
    :type email: str
    :param password: is the password of the email
    :type password: str
    """
    self.__smtp.ehlo()
    self.__smtp.starttls()
    self.__smtp.ehlo()
    self.__smtp.login(user=email, password=password)

send_email(from_addr, to_addr, email)

Sends an email. :param from_addr: is the address of the sender (misp email9 :type from_addr: str :param to_addr: is the address of the receiver (user) :type to_addr: str :param email: is the content of the email :type email: str

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def send_email(self: Self, from_addr: str, to_addr: str, email: str) -> None:
    """
    Sends an email.
    :param from_addr: is the address of the sender (misp email9
    :type from_addr: str
    :param to_addr: is the address of the receiver (user)
    :type to_addr: str
    :param email: is the content of the email
    :type email: str
    """
    try:
        self.__smtp.sendmail(from_addr, to_addr, email)
    except smtplib.SMTPRecipientsRefused as e:
        log.warning(f"Email to {to_addr} was refused: {e}")

EmailConfigData

Bases: ConfigData

Encapsulates configuration for the email worker and its jobs.

Source code in src/mmisp/worker/jobs/email/utility/email_config_data.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class EmailConfigData(ConfigData):
    """
    Encapsulates configuration for the email worker and its jobs.
    """

    mmisp_url: str = "http://127.0.0.1"
    """The url of MISP"""
    email_subject_string: str = "tlp"
    """The tlp string to search for an email subject"""
    mmisp_email_address: str = "misp@localhost"
    """The email of MISP"""
    mmisp_email_username: str = "misp"
    """The username of the MISP email"""
    mmisp_email_password: str = ""
    """The password of the MISP email"""
    mmisp_smtp_port: NonNegativeInt = 25
    """The port of the SMTP server"""
    mmisp_smtp_host: str = "localhost"
    """The host of the SMTP server"""

    def __init__(self: Self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.read_from_env()

    def read_from_env(self: Self) -> None:
        """
        Reads the configuration from the environment.
        """

        env_dict: dict = {
            "mmisp_url": os.environ.get(ENV_URL),
            "email_subject_string": os.environ.get(ENV_EMAIL_SUBJECT_STRING),
            "mmisp_email_address": os.environ.get(ENV_EMAIL_ADDRESS),
            "mmisp_email_username": os.environ.get(ENV_EMAIL_USERNAME),
            "mmisp_email_password": os.environ.get(ENV_EMAIL_PASSWORD, ""),
            "mmisp_smtp_port": os.environ.get(ENV_SMTP_PORT),
            "mmisp_smtp_host": os.environ.get(ENV_SMTP_HOST),
        }

        for env in env_dict:
            value: str = env_dict[env]
            if value:
                try:
                    setattr(self, env, value)
                except ValidationError as validation_error:
                    _log.exception(f"Error while reading {env} from environment: {validation_error}")

email_subject_string: str = 'tlp' class-attribute instance-attribute

The tlp string to search for an email subject

mmisp_email_address: str = 'misp@localhost' class-attribute instance-attribute

The email of MISP

mmisp_email_password: str = '' class-attribute instance-attribute

The password of the MISP email

mmisp_email_username: str = 'misp' class-attribute instance-attribute

The username of the MISP email

mmisp_smtp_host: str = 'localhost' class-attribute instance-attribute

The host of the SMTP server

mmisp_smtp_port: NonNegativeInt = 25 class-attribute instance-attribute

The port of the SMTP server

mmisp_url: str = 'http://127.0.0.1' class-attribute instance-attribute

The url of MISP

read_from_env()

Reads the configuration from the environment.

Source code in src/mmisp/worker/jobs/email/utility/email_config_data.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def read_from_env(self: Self) -> None:
    """
    Reads the configuration from the environment.
    """

    env_dict: dict = {
        "mmisp_url": os.environ.get(ENV_URL),
        "email_subject_string": os.environ.get(ENV_EMAIL_SUBJECT_STRING),
        "mmisp_email_address": os.environ.get(ENV_EMAIL_ADDRESS),
        "mmisp_email_username": os.environ.get(ENV_EMAIL_USERNAME),
        "mmisp_email_password": os.environ.get(ENV_EMAIL_PASSWORD, ""),
        "mmisp_smtp_port": os.environ.get(ENV_SMTP_PORT),
        "mmisp_smtp_host": os.environ.get(ENV_SMTP_HOST),
    }

    for env in env_dict:
        value: str = env_dict[env]
        if value:
            try:
                setattr(self, env, value)
            except ValidationError as validation_error:
                _log.exception(f"Error while reading {env} from environment: {validation_error}")

UtilityEmail

Provides functionality to built emails.

Source code in src/mmisp/worker/jobs/email/utility/utility_email.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class UtilityEmail:
    """
    Provides functionality to built emails.
    """

    @staticmethod
    def get_email_subject_mark_for_event(event: AddEditGetEventDetails, email_subject_string: str) -> str:
        """
        Returns the tlp tag of the given event as a subject for emails.

        :param event: the event to get the subject for
        :type event: AddEditGetEventDetails
        :param email_subject_string: is the tlp string to search
        :type email_subject_string: str
        :return: the tlp tag of the event
        :rtype: str
        """
        if event.Tag is not None:
            for tag in event.Tag:
                if email_subject_string in tag.name:
                    return tag.name

        return email_subject_string

    @staticmethod
    async def send_emails(
        misp_api: MispAPI,
        misp_email_address: str,
        email_username: str,
        email_password: str,
        smtp_port: int,
        smtp_host: str,
        receiver_ids: list[int],
        email_msg: EmailMessage,
    ) -> None:
        """
        Sends emails to the given users by opening an SMTP connection

        :param misp_email_address: is the email of misp
        :type misp_email_address: str
        :param email_username: is the username of misp
        :type email_username: str
        :param email_password: is the password of misp
        :type email_password: str
        :param smtp_port: is the port of the SMTP server
        :type smtp_port: int
        :param smtp_host: is the host of the SMTP server
        :type smtp_host: str
        :param receiver_ids: are the ids of the users who get the email
        :type receiver_ids: list[int]
        :param email_msg: is the email which will be sent
        :type email_msg: EmailMessage
        """
        smtp_client: SmtpClient = SmtpClient(smtp_host, smtp_port)

        smtp_client.open_smtp_connection(email_username, email_password)

        for receiver_id in receiver_ids:
            user: MispUser = await misp_api.get_user(receiver_id)
            email_msg["To"] = user.email
            smtp_client.send_email(misp_email_address, user.email, email_msg.as_string())
            del email_msg["To"]

        smtp_client.close_smtp_connection()

get_email_subject_mark_for_event(event, email_subject_string) staticmethod

Returns the tlp tag of the given event as a subject for emails.

:param event: the event to get the subject for :type event: AddEditGetEventDetails :param email_subject_string: is the tlp string to search :type email_subject_string: str :return: the tlp tag of the event :rtype: str

Source code in src/mmisp/worker/jobs/email/utility/utility_email.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
def get_email_subject_mark_for_event(event: AddEditGetEventDetails, email_subject_string: str) -> str:
    """
    Returns the tlp tag of the given event as a subject for emails.

    :param event: the event to get the subject for
    :type event: AddEditGetEventDetails
    :param email_subject_string: is the tlp string to search
    :type email_subject_string: str
    :return: the tlp tag of the event
    :rtype: str
    """
    if event.Tag is not None:
        for tag in event.Tag:
            if email_subject_string in tag.name:
                return tag.name

    return email_subject_string

send_emails(misp_api, misp_email_address, email_username, email_password, smtp_port, smtp_host, receiver_ids, email_msg) async staticmethod

Sends emails to the given users by opening an SMTP connection

:param misp_email_address: is the email of misp :type misp_email_address: str :param email_username: is the username of misp :type email_username: str :param email_password: is the password of misp :type email_password: str :param smtp_port: is the port of the SMTP server :type smtp_port: int :param smtp_host: is the host of the SMTP server :type smtp_host: str :param receiver_ids: are the ids of the users who get the email :type receiver_ids: list[int] :param email_msg: is the email which will be sent :type email_msg: EmailMessage

Source code in src/mmisp/worker/jobs/email/utility/utility_email.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@staticmethod
async def send_emails(
    misp_api: MispAPI,
    misp_email_address: str,
    email_username: str,
    email_password: str,
    smtp_port: int,
    smtp_host: str,
    receiver_ids: list[int],
    email_msg: EmailMessage,
) -> None:
    """
    Sends emails to the given users by opening an SMTP connection

    :param misp_email_address: is the email of misp
    :type misp_email_address: str
    :param email_username: is the username of misp
    :type email_username: str
    :param email_password: is the password of misp
    :type email_password: str
    :param smtp_port: is the port of the SMTP server
    :type smtp_port: int
    :param smtp_host: is the host of the SMTP server
    :type smtp_host: str
    :param receiver_ids: are the ids of the users who get the email
    :type receiver_ids: list[int]
    :param email_msg: is the email which will be sent
    :type email_msg: EmailMessage
    """
    smtp_client: SmtpClient = SmtpClient(smtp_host, smtp_port)

    smtp_client.open_smtp_connection(email_username, email_password)

    for receiver_id in receiver_ids:
        user: MispUser = await misp_api.get_user(receiver_id)
        email_msg["To"] = user.email
        smtp_client.send_email(misp_email_address, user.email, email_msg.as_string())
        del email_msg["To"]

    smtp_client.close_smtp_connection()

app = init_app() module-attribute

The FastAPI instance.

main()

The entry point of the MMISP Worker application. Starts the enabled workers and sets up the API.

Source code in src/mmisp/worker/main.py
41
42
43
44
45
46
47
48
49
def main() -> None:
    """
    The entry point of the MMISP Worker application.
    Starts the enabled workers and sets up the API.
    """

    config: SystemConfigData = system_config_data

    uvicorn.run(f"{__name__}:app", port=int(config.api_port), log_level="info", host=config.api_host)

InvalidPluginResult

Bases: Exception

Exception that is raised when a plugin returns an invalid result that can not be utilized.

Source code in src/mmisp/worker/exceptions/plugin_exceptions.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class InvalidPluginResult(Exception):
    """
    Exception that is raised when a plugin returns an invalid result that can not be utilized.
    """

    def __init__(self: Self, plugin_name: str = "", message: str = "") -> None:
        default_message: str = "The result of the executed plugin is not valid and can not be utilized."
        if message:
            self.message = message
        elif plugin_name:
            self.message = f"The result provided by the plugin '{plugin_name}' is not valid."
        else:
            self.message = default_message
        super().__init__(self.message)

NotAValidPlugin

Bases: Exception

Exception that is raised when a class does not match the requirements of a valid plugin.

Source code in src/mmisp/worker/exceptions/plugin_exceptions.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class NotAValidPlugin(Exception):
    """
    Exception that is raised when a class does not match the requirements of a valid plugin.
    """

    def __init__(self: Self, plugin_name: str | None = None, message: str = "") -> None:
        default_message: str = "The requested Plugin is not a valid plugin. It does not meet the requirements."
        if message:
            self.message = message
        elif plugin_name:
            self.message = (
                f"The requested '{plugin_name}'-Plugin is not a valid plugin. It does not meet the requirements."
            )
        else:
            self.message = default_message
        super().__init__(self.message)

PluginExecutionException

Bases: Exception

Exception that is raised when a plugin execution fails. Can be thrown by the plugin itself.

Source code in src/mmisp/worker/exceptions/plugin_exceptions.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class PluginExecutionException(Exception):
    """
    Exception that is raised when a plugin execution fails.
    Can be thrown by the plugin itself.
    """

    def __init__(self: Self, plugin_name: str = "", message: str = "") -> None:
        default_message: str = "The requested Plugin could not be executed successfully."
        if message:
            self.message = message
        elif plugin_name:
            self.message = f"The execution of the requested '{plugin_name}'-Plugin failed."
        else:
            self.message = default_message
        super().__init__(self.message)

PluginImportError

Bases: Exception

Exceptions that is raised when a python module of a plugin could not be imported.

Source code in src/mmisp/worker/exceptions/plugin_exceptions.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class PluginImportError(Exception):
    """
    Exceptions that is raised when a python module of a plugin could not be imported.
    """

    def __init__(self: Self, plugin_module: str | None = None, message: str = "") -> None:
        default_message: str = "The requested Plugin could not be imported."
        if message:
            self.message = message
        elif plugin_module:
            self.message = (
                f"The plugin module {plugin_module} could not be imported. "
                f"Please check it is a valid python module."
            )
        else:
            self.message = default_message
        super().__init__(self.message)

PluginNotFound

Bases: Exception

Exception that is raised when a requested plugin could not be found.

Source code in src/mmisp/worker/exceptions/plugin_exceptions.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class PluginNotFound(Exception):
    """
    Exception that is raised when a requested plugin could not be found.
    """

    def __init__(self: Self, plugin_name: str | None = None, message: str = "") -> None:
        default_message: str = "The requested Plugin could not be found."
        if message:
            self.message = message
        elif plugin_name:
            self.message = f"The requested '{plugin_name}'-Plugin could not be found."
        else:
            self.message = default_message
        super().__init__(self.message)

PluginRegistrationError

Bases: Exception

Exception that is raised when a plugin could not be registered.

Source code in src/mmisp/worker/exceptions/plugin_exceptions.py
37
38
39
40
41
42
43
44
class PluginRegistrationError(Exception):
    """
    Exception that is raised when a plugin could not be registered.
    """

    def __init__(self: Self, message: str) -> None:
        self.message = message
        super().__init__(self.message)

JobException

Bases: Exception

Exception raised when an error occurred while processing a job

Source code in src/mmisp/worker/exceptions/job_exceptions.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class JobException(Exception):
    """
    Exception raised when an error occurred while processing a job
    """

    def __init__(
        self: Self, job_id: str | None = None, message: str = "An error occurred while processing the Job"
    ) -> None:
        self.message: str
        if job_id:
            self.message = f"An error occurred while processing the Job with id: {job_id}"
        else:
            self.message = message
        super().__init__(self.message)

JobHasNoResultException

Bases: Exception

Exception raised when a requested job has no result that can be returned

Source code in src/mmisp/worker/exceptions/job_exceptions.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class JobHasNoResultException(Exception):
    """
    Exception raised when a requested job has no result that can be returned
    """

    def __init__(
        self: Self,
        job_id: str | None = None,
        job_type: str | None = None,
        message: str = "The requestet Jobtype has no result that can be returned",
    ) -> None:
        self.message: str
        if job_id is None and job_type is None:
            self.message = message
        elif job_id is None:
            self.message = f"The requested Job of type {job_type} has no result that can be returned"
        elif job_type is None:
            self.message = f"The requested Job with id: {job_id} has no result that can be returned"
        else:
            self.message = (
                f"The requested Job with id: {job_id} is of type {job_type}, which has no result that can "
                f"be returned"
            )
        super().__init__(self.message)

JobNotFinishedException

Bases: Exception

Exception raised when a requested job is not finished yet

Source code in src/mmisp/worker/exceptions/job_exceptions.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class JobNotFinishedException(Exception):
    """
    Exception raised when a requested job is not finished yet
    """

    def __init__(
        self: Self, job_id: str | None = None, message: str = "The Job is not finished yet, please try again later"
    ) -> None:
        self.message: str
        if job_id is None:
            self.message = message
        else:
            self.message = f"The Job with id: {job_id} is not finished yet, please try again later"

        super().__init__(self.message)

NotExistentJobException

Bases: Exception

Exception raised when a requested job does not exist

Source code in src/mmisp/worker/exceptions/job_exceptions.py
20
21
22
23
24
25
26
27
28
29
30
31
class NotExistentJobException(Exception):
    """
    Exception raised when a requested job does not exist
    """

    def __init__(self: Self, job_id: str | None = None, message: str = "The requested Job does not exist") -> None:
        self.message: str
        if job_id is None:
            self.message = message
        else:
            self.message = f"The requested job with id: {job_id} does not exist"
        super().__init__(self.message)

EnvVariableNotFound

Bases: Exception

Exception raised when an environment variable is not found

Source code in src/mmisp/worker/exceptions/environment_exceptions.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class EnvVariableNotFound(Exception):
    """
    Exception raised when an environment variable is not found
    """

    def __init__(
        self: Self, env_var: str | None = None, message: str = "A requested environment variable was not found"
    ) -> None:
        self.message: str
        if env_var is None:
            self.message = message
        else:
            self.message = f'The environment variable "{env_var}" could not be found'
        super().__init__(self.message)

APIException

Bases: Exception

Exception raised when an error occurred while processing an API request

Source code in src/mmisp/worker/exceptions/misp_api_exceptions.py
 4
 5
 6
 7
 8
 9
10
11
class APIException(Exception):
    """
    Exception raised when an error occurred while processing an API request
    """

    def __init__(self: Self, message: str = "An Error occurred while processing the API request") -> None:
        self.message = message
        super().__init__(self.message)

InvalidAPIResponse

Bases: Exception

Exception raised when an API response is not valid

Source code in src/mmisp/worker/exceptions/misp_api_exceptions.py
14
15
16
17
18
19
20
21
class InvalidAPIResponse(Exception):
    """
    Exception raised when an API response is not valid
    """

    def __init__(self: Self, message: str = "The API response is not valid") -> None:
        self.message = message
        super().__init__(self.message)

ForbiddenByServerSettings

Bases: Exception

Exception raised when a requested action was denied by another servers settings

Source code in src/mmisp/worker/exceptions/server_exceptions.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class ForbiddenByServerSettings(Exception):
    """
    Exception raised when a requested action was denied by another servers settings
    """

    def __init__(
        self: Self,
        server_id: str | None = None,
        message: str = "A requested action was denied by another servers settings",
    ) -> None:
        if server_id is None:
            self.message: str = message
        else:
            self.message = f"The requested action was denied by the server with id: {server_id} because of its settings"
        super().__init__(self.message)

InvalidServerVersion

Bases: Exception

Exception raised when a server has an invalid version

Source code in src/mmisp/worker/exceptions/server_exceptions.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class InvalidServerVersion(Exception):
    """
    Exception raised when a server has an invalid version
    """

    def __init__(
        self: Self,
        server_id: str | None = None,
        message: str = "Another server that was requested has an invalid version",
    ) -> None:
        if server_id is None:
            self.message = message
        else:
            self.message = f"The server with id: {server_id} has an invalid version"
        super().__init__(self.message)

ServerNotReachable

Bases: Exception

Exception raised when a server is not reachable

Source code in src/mmisp/worker/exceptions/server_exceptions.py
21
22
23
24
25
26
27
28
29
30
31
class ServerNotReachable(Exception):
    """
    Exception raised when a server is not reachable
    """

    def __init__(self: Self, server_id: str | None = None, message: str = "A server is not reachable") -> None:
        if server_id is None:
            self.message = message
        else:
            self.message = f"The server with id: {server_id} is not reachable"
        super().__init__(self.message)