Source code for sirup.utils

import logging
import os
import subprocess
import time
import warnings
from subprocess import PIPE
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .TemporaryFileWithRootPermission import TemporaryFileWithRootPermission


[docs] def get_ip(echo=False, config_file=None): """Query the current IP address of the computer. The function calls `https://ifconfig.me` and retrieves the IP address. Args: echo (bool, optional): If `True`, logging prints the retrieved IP address config_file (str, optional): Name of the vpn configuration file currently in use. If supplied, logging prints the name of the config file if the IP address cannot be retrieved """ # sources: # https://stackoverflow.com/questions/23013220/max-retries-exceeded-with-url-in-requests, # https://urllib3.readthedocs.io/en/stable/reference/urllib3.util.html session = requests.Session() retry = Retry(total=3, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) session.mount("https://", adapter) session.mount("http://", adapter) try: response = session.get("https://ifconfig.me", timeout=3) if response.status_code == 200: if echo: logging.info("IP is: %s", response.text) return response.text # raise requests.ConnectionError("Failed to get the IP address") msg = "Failed to get the IP address" if config_file is not None: msg = f"{msg}. The config file is {config_file}." logging.info(msg) return 1234 except Exception as e: logging.info("Got an exception: %s", e) raise requests.ConnectionError("Failed to get the IP address")
[docs] def lookup_strings_in_list(strings_to_check, list_of_strings): """Scan a list of strings for presence of one or multiple strings in the same element. Args: strings_to_check (list): the strings to look for. list_of_strings (list): the strings to scan. Returns: bool: `True` if there is at least one element in `list_of_strings` that contains all strings in `strings_to_check`. Example: >>> from sirup.utils import lookup_strings_in_list >>> lookup_strings_in_list(["hello", "world"], ["hello, wonderful world", "hello universe"]) True """ elements_contain_two_strings = [all([s in element for s in strings_to_check]) for element in list_of_strings] #pylint: disable=use-a-generator return any(elements_contain_two_strings)
[docs] def check_connection(log_file, timeout, pwd, waiting_time=2): """Wait and test for established connection until `timeout`. The function repeatedly scans the log file of the openvpn process and looks for a string that indicates that the vpn connection was established. It exits when the string is found or `timeout` is reached. Args: log_file (str): path to the log file of the vpn process. timeout (int): maximum time to wait for a successful conection. pwd (str): user root password. waiting_time (int, optional): Number of seconds to wait between consecutive scans. Returns: bool: indicates whether connection is established. """ start_time = time.time() connected = False while time.time() - start_time < timeout and not connected: time.sleep(waiting_time) log = sudo_read_file(file=log_file, pwd=pwd) connected = "Initialization Sequence Completed" in log[-1] return connected
[docs] def sudo_read_file(file, pwd=None): """Read a file with root permission to a list. Args: file (str or TemporaryFileWithRootPermission): the file to read pwd (str, optional): root password for the file. If file is a `TemporaryFileWithRootPermission` and no password is provided, the password is taken from the `TemporaryFileWithRootPermission` object. Returns: list: Content of the file, each line is one element in the list. """ if isinstance(file, TemporaryFileWithRootPermission): file = file.file_name if pwd is None: pwd = file.password cmd = ["cat", file] if pwd is None: output = subprocess.run(cmd, capture_output=True, check=True) else: cmd.insert(0, "sudo") cmd.insert(1, "-S") output = subprocess.run(cmd, input=pwd.encode(), capture_output=True, check=True) content = output.stdout.decode() content = content.rstrip().splitlines() return content
[docs] def check_password(pwd): # TODO: use this only in the rotator class and test it when writing this. "Run simple command to see if password is correct" with subprocess.Popen(['sudo', "ls"], stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: _, _ = proc.communicate(input=f"{pwd}\n".encode()) if proc.returncode != 0: raise RuntimeError("Wrong password") return True
[docs] def list_files_with_full_path(directory, rule=None): """Collect all files in a directory and return a list of them with their full path. Args: directory (str): directory path rule (lambda, optional): a lambda function that returns a `bool`. If supplied, `list_files_with_full_path` filters the files in the directory by whether `rule` returns `True`. Returns: list: files, possibly filtered, with the full path. """ files = os.listdir(directory) if rule is not None: files = [rule(f) for f in files if rule(f)] files_with_full_path = [os.path.join(directory, f) for f in files] return files_with_full_path
[docs] class RotationList(list): "Custom list for IP rotation." def __init__(self, *args): super().__init__(*args)
[docs] def __repr__(self): return f"{self.__class__.__name__}({super().__repr__()})"
[docs] def pop_append(self): "Pop first element and append it at the end." el = self.pop(0) self.append(el) return el
[docs] def shuffle(self, randomizer): """Randomly shuffle the list of proxies. This changes the order by which we iterate through them. Args: randomizer (random.Random): pseudo-random number generator. """ randomizer.shuffle(self)
[docs] def get_vpn_pids(): """Extract all openvpn process ids on the machine. Returns: list: all openvpn process IDs. """ pgrep_command = ["pgrep", "openvpn"] pgrep_process = subprocess.Popen(pgrep_command, stdout=subprocess.PIPE, text=True) #pylint: disable=consider-using-with pgrep_output, _ = pgrep_process.communicate() openvpn_pids = pgrep_output.strip().split('\n') return openvpn_pids
[docs] def kill_all_connections(pwd): """Kill all openvpn connections on the machine Args: pwd (str): root password to the machine. """ openvpn_pids = get_vpn_pids() if openvpn_pids == [""]: logging.info("No openvpn processes found to be killed.") else: kill_command = ["sudo", "-S", "kill", "-15"] + openvpn_pids rc = subprocess.run(kill_command, input=pwd.encode(), capture_output=True, check=False) if rc.returncode != 0: msg = f"Killing vpn connections returned with exit status {rc.returncode}" warnings.warn(msg, UserWarning)
# logging.info(msg)