Source code for server.app.utils.perform_measurements

import pprint
import time

import ntplib
from ipaddress import ip_address
import json
from typing import Optional, Tuple, Any
import requests

from server.app.dtos.AdvancedSettings import AdvancedSettings
from server.app.utils.analyze_ntp_versions import *
from server.app.utils.nts_check import perform_nts_measurement_domain_name
from server.app.dtos.ProbeData import ServerLocation
from server.app.utils.location_resolver import get_country_for_ip, get_coordinates_for_ip
from server.app.models.CustomError import InputError, RipeMeasurementError
from server.app.utils.calculations import ntp_precise_time_to_human_date, convert_float_to_precise_time, \
    get_non_responding_ntp_measurement
from server.app.utils.ip_utils import get_ip_family, ref_id_to_ip_or_name, get_server_ip, ip_to_str
from server.app.utils.load_config_data import get_ripe_account_email, get_ripe_api_token, get_ntp_version, \
    get_timeout_measurement_s, get_ripe_number_of_probes_per_measurement, \
    get_ripe_timeout_per_probe_ms, get_ripe_packets_per_probe, get_right_ntp_nts_binary_tool_for_your_os
from server.app.utils.ripe_probes import get_probes
from server.app.utils.domain_name_to_ip import domain_name_to_ip_list
from server.app.dtos.NtpExtraDetails import NtpExtraDetails
from server.app.dtos.NtpMainDetails import NtpMainDetails
from server.app.dtos.NtpMeasurement import NtpMeasurement
from server.app.dtos.NtpServerInfo import NtpServerInfo
from server.app.dtos.NtpTimestamps import NtpTimestamps
from server.app.dtos.PreciseTime import PreciseTime
from server.app.utils.validate import is_ip_address



[docs] def perform_ntp_measurement_domain_name_list(server_name: str, client_ip: Optional[str] = None, wanted_ip_type: int = 4, ntp_version: int = get_ntp_version()) -> Optional[list[NtpMeasurement]]: """ This method performs a NTP measurement on a NTP server from all the IPs got back from its domain name. Args: server_name (str): The name of the ntp server. client_ip (Optional[str]): The IP address of the client (if given). wanted_ip_type (int): The IP type that we want to measure. ntp_version (int): The version of the ntp that you want to use. Returns: Optional[list[NtpMeasurement]]: It returns a list of NTP measurement objects or None if there is a timeout. Raises: DNSError: If the domain name is invalid or cannot be converted to an IP list. """ domain_ips: list[str] = domain_name_to_ip_list(server_name, client_ip, wanted_ip_type) # domain_ips contains a list of ips that are good to use. #nts_analysis = perform_nts_measurement_domain_name(server_name, False, wanted_ip_type) resulted_measurements = [] ok = False for ip_str in domain_ips: try: client = ntplib.NTPClient() response_from_ntplib = client.request(ip_str, ntp_version, timeout=get_timeout_measurement_s()) r = convert_ntp_response_to_measurement(response=response_from_ntplib, server_ip_str=ip_str, server_name=server_name, ntp_version=ntp_version) if r is not None: #r.server_info.ref_name=nts_analysis["NTS analysis"] # for testing to see if it works. resulted_measurements.append(r) ok = True except Exception as e: print(f"Error in measure from name on ip {ip_str} (this IP failed, maybe others succeeded):", e) empty_measurement = get_non_responding_ntp_measurement(ip_str, server_name, ntp_version) resulted_measurements.append(empty_measurement) continue return resulted_measurements if ok is True else None
[docs] def perform_ntp_measurement_ip(server_ip_str: str, ntp_version: int = get_ntp_version()) -> Optional[NtpMeasurement]: """ This method performs an NTP measurement on an NTP server using its IP address. Args: server_ip_str (str): The IP address of the ntp server in string format. ntp_version (int): The version of the ntp that you want to use. Returns: Optional[NtpMeasurement]: It returns the NTP measurement object or None if something wrong happened. (usually timeouts) """ if is_ip_address(server_ip_str) is None: return None # server_name is not available here. We can only use the ip which is initially a string try: client = ntplib.NTPClient() response = client.request(server_ip_str, ntp_version, timeout=get_timeout_measurement_s()) return convert_ntp_response_to_measurement(response=response, server_ip_str=server_ip_str, server_name=None, ntp_version=ntp_version) except Exception as e: print("Error in measure from ip:", e) return None
[docs] def convert_ntp_response_to_measurement(response: ntplib.NTPStats, server_ip_str: str, server_name: Optional[str], ntp_version: int = get_ntp_version()) -> Optional[NtpMeasurement]: """ This method converts an NTP response to an NTP measurement object. Args: response (ntplib.NTPStats): The NTP response to convert. server_ip_str (str): The IP address of the ntp server in string format. server_name (Optional[str]): The name of the ntp server. ntp_version (int): The version of the ntp that you want to use. Returns: Optional[NtpMeasurement]: It returns an NTP measurement object if the conversion was successful. """ try: vantage_point_ip = None ip_type = get_ip_family(server_ip_str) # get the same type (We guaranteed before calling this method that it exists) vantage_point_ip_temp = get_server_ip(ip_type) if vantage_point_ip_temp is not None: vantage_point_ip = vantage_point_ip_temp ref_ip, ref_name = ref_id_to_ip_or_name(response.ref_id, response.stratum, get_ip_family(server_ip_str)) server_ip = ip_address(server_ip_str) server_info: NtpServerInfo = NtpServerInfo( ntp_version=ntp_version, ntp_server_ip=server_ip, ntp_server_name=server_name, ntp_server_ref_parent_ip=ref_ip, ref_name=ref_name, ntp_server_location=ServerLocation(country_code=get_country_for_ip(ip_to_str(server_ip)), coordinates=get_coordinates_for_ip(ip_to_str(server_ip))) ) timestamps: NtpTimestamps = NtpTimestamps( client_sent_time=PreciseTime(ntplib._to_int(response.orig_timestamp), ntplib._to_frac(response.orig_timestamp)), server_recv_time=PreciseTime(ntplib._to_int(response.recv_timestamp), ntplib._to_frac(response.recv_timestamp)), server_sent_time=PreciseTime(ntplib._to_int(response.tx_timestamp), ntplib._to_frac(response.tx_timestamp)), client_recv_time=PreciseTime(ntplib._to_int(response.dest_timestamp), ntplib._to_frac(response.dest_timestamp)) ) main_details: NtpMainDetails = NtpMainDetails( offset=response.offset, rtt=response.delay, stratum=response.stratum, precision=response.precision, reachability="" ) extra_details: NtpExtraDetails = NtpExtraDetails( root_delay=convert_float_to_precise_time(response.root_delay), ntp_last_sync_time=convert_float_to_precise_time(response.ref_timestamp), leap=response.leap, poll=response.poll, root_dispersion=convert_float_to_precise_time(response.root_dispersion) ) return NtpMeasurement(vantage_point_ip, server_info, timestamps, main_details, extra_details) except Exception as e: print("Error in convert response to measurement:", e) return None
[docs] def analyze_supported_ntp_versions(server: str, settings: AdvancedSettings) -> dict: """ This method analyzes supported NTP versions for the specified server. It will provide an analysis on each NTP version from NTPv1 to NTPv5 (draft). (no NTS in this method) Each NTP version analysis will have a "confidence" (0->100) that says how much this server thinks the NTP server supports each versions. For example, if it truly supports an NTP version, the confidence will be 100. This method returns either a dictionary with an error (if running the tool fails), or a dictionary with the whole structure (requested_versions*3=15 variables) with each field (even though some NTP version failed, you will still have all the fields). Args: server (str): The server to analyze (domain name or IP address). settings (AdvancedSettings): The settings to use for the analysis. Returns: dict: A dictionary containing an analysis on each NTP versions. """ # ntpvX_supported_confidence is from 0% to 100% (X is from 1 to 5) # 0% means: not supported at all (no response) # 25% means: received something, but failed to get the data, or invalid format # 50% means: received an NTP response, but with a different NTP version (honest server) # 75% means: received an NTP response, correct version, but content seems to be from another NTP version (server may have lied) # 100% means: fully valid ntpvX response received ntp_versions_analysis: dict = {} try: binary_nts_tool = get_right_ntp_nts_binary_tool_for_your_os() except Exception as e: ntp_versions_analysis["error"] = "NTP versions test could not be performed (binary tool not available)." # This is the case when the tool fails, because python was not able to find it or run it. return ntp_versions_analysis if settings.analyse_all_ntp_versions: # more easily and reliable if we want all NTP versions ntp_versions_analysis = directly_analyze_all_ntp_versions(server, str(binary_nts_tool), settings.ntpv5_draft) else: for ntp_version in settings.ntp_versions_to_analyze: # we assume settings has valid input (ntp_versions_analysis[ntp_version + "_supported_confidence"], ntp_versions_analysis[ntp_version + "_analysis"], # settings.ntpv5_draft will be considered if and only if the ntp_version is "ntpv5" ntp_versions_analysis[ntp_version + "_m_result"]) = run_tool_on_ntp_version(server, str(binary_nts_tool), ntp_version, settings.ntpv5_draft) time.sleep(1) # to not get RATE from the server return ntp_versions_analysis
[docs] def perform_ripe_measurement_domain_name(server_name: str, settings: AdvancedSettings, probes_requested: int = get_ripe_number_of_probes_per_measurement()) -> int: """ This method performs a RIPE measurement on a domain name. It lets the RIPE atlas probe to decide which IP of that domain name to use. (You can see this IP from the details of the measurement by looking at the measurement ID) Args: server_name (str): The domain name of the NTP server. settings (AdvancedSettings): The settings to use. probes_requested (int): The number of probes requested. Returns: int: It returns the ID of the measurement and the list of IPs of the domain name. You can find in the measurement what IP it used. Raises: InputError: If the conversion could not be performed. RipeMeasurementError: If the ripe measurement could not be performed. """ client_ip = settings.custom_client_ip if probes_requested <= 0: raise InputError("Probes requested must be greater than 0.") get_ip_family(client_ip) # throws an error if it is invalid # we will make an NTP measurement from probes to the domain name. # measurement settings # we use wanted_ip_type to force to search this type headers, request_content = get_request_settings(ntp_server=server_name, settings=settings, probes_requested=probes_requested) # perform the measurement response = requests.post( "https://atlas.ripe.net/api/v2/measurements/", headers=headers, data=json.dumps(request_content) ) data = response.json() # the answer has a list of measurements, but we only did one measurement so we send one. try: ans: int = data["measurements"][0] except Exception as e: if "error" in data: raise RipeMeasurementError(data["error"]) else: raise RipeMeasurementError(f"Ripe measurement failed:{e}") return ans
[docs] def perform_ripe_measurement_ip(ntp_server_ip: str, settings: AdvancedSettings, probes_requested: int = get_ripe_number_of_probes_per_measurement()) -> int: """ This method performs a RIPE measurement and returns the ID of the measurement. Args: ntp_server_ip (str): The NTP server IP. settings (AdvancedSettings): The settings to use. probes_requested (int): The number of probes requested. Returns: int: The ID of the measurement. Raises: InputError: If the NTP server IP is not valid, probe requested is negative. RipeMeasurementError: If the ripe measurement could not be performed. """ client_ip = settings.custom_client_ip if probes_requested <= 0: raise InputError("Probes requested must be greater than 0.") get_ip_family(client_ip) # this will throw an exception if the client_ip is not an IP address ip_family = get_ip_family(ntp_server_ip) # this will throw an exception if the ntp_server_ip is not an IP address settings.wanted_ip_type = ip_family # measurement settings headers, request_content = get_request_settings(ntp_server=ntp_server_ip, settings=settings, probes_requested=probes_requested) # perform the measurement response = requests.post( "https://atlas.ripe.net/api/v2/measurements/", headers=headers, data=json.dumps(request_content) ) data = response.json() # the answer has a list of measurements, but we only did one measurement so we send one. try: ans: int = data["measurements"][0] except Exception as e: if "error" in data: raise RipeMeasurementError(data["error"]) else: raise RipeMeasurementError(f"Ripe measurement failed:{e}") return ans
[docs] def get_request_settings(ntp_server: str, settings: AdvancedSettings, probes_requested: int = get_ripe_number_of_probes_per_measurement()) -> tuple[dict, dict]: """ This method gets the RIPE measurement settings for the performing a RIPE measurement. Args: ntp_server (str): The NTP server IP address or domain name. settings (AdvancedSettings): The settings to use, including client_ip, and The IP family of the NTP server. (4 or 6)) probes_requested (int): The number of probes requested. Returns: tuple[dict, dict]: Returns the RIPE measurement settings for the performing a RIPE measurement. Raises: InputError: If the input is invalid. RipeMeasurementError: If the ripe measurement could not be performed. ValueError: If some variable in env is not correctly set. """ headers = { "Authorization": f"Key {get_ripe_api_token()}", "Content-Type": "application/json" } request_content = {"definitions": [ { "type": "ntp", "af": settings.wanted_ip_type, "resolve_on_probe": True, "description": f"NTP measurement to {ntp_server}", "packets": get_ripe_packets_per_probe(), "timeout": get_ripe_timeout_per_probe_ms(), "skip_dns_check": False, "target": ntp_server } ], "is_oneoff": True, "bill_to": get_ripe_account_email(), "probes": get_probes(settings, probes_requested) # we want probes close to the client } return headers, request_content
# example to see how you use them # s=AdvancedSettings() # s.analyse_all_ntp_versions=True # # s.ntp_versions_to_analyze=["ntpv1", "ntpv2", "ntpv3", "ntpv4", "ntpv5"] # s.ntpv5_draft="draft-ietf-ntp-ntpv5-06" # pprint.pprint(analyze_supported_ntp_versions("time.google.com",s)) # import threading # # def run_tool(): # s = AdvancedSettings() # s.analyse_all_ntp_versions = True # # s.ntp_versions_to_analyze=["ntpv1", "ntpv2", "ntpv3", "ntpv4", "ntpv5"] # s.ntpv5_draft = "draft-ietf-ntp-ntpv5-06" # pprint.pprint(analyze_supported_ntp_versions("time.google.com", s)) # # threading.Thread(target=run_tool).start() # print_ntp_measurement(perform_ntp_measurement_domain_name_list("time.apple.com", "5a01:c741:a16:4000::1f2", 6, 4)[0]) # print_ntp_measurement(perform_ntp_measurement_domain_name_list("time.cloudflare.com", "17.253.6.45", 4,4)[0]) # print_ntp_measurement(perform_ntp_measurement_domain_name_list("ntpd-rs.sidnlabs.nl", None, 4,5)[0]) # print_ntp_measurement(perform_ntp_measurement_domain_name_list("time.apple.com", "17.253.6.45", 6,4)[0]) # print(perform_ripe_measurement_ip("2a01:b740:a16:4000::1f2","2a01:c741:a16:4000::1f2", 12)) # print(perform_ripe_measurement_domain_name("time.apple.com","83.25.24.10", 6, 15))