import pprint
import time
import ntplib
from ipaddress import ip_address
import json
from typing import Optional, Tuple, Any
import requests
from server.app.dtos.AdvancedSettings import AdvancedSettings
from server.app.utils.analyze_ntp_versions import *
from server.app.utils.nts_check import perform_nts_measurement_domain_name
from server.app.dtos.ProbeData import ServerLocation
from server.app.utils.location_resolver import get_country_for_ip, get_coordinates_for_ip
from server.app.models.CustomError import InputError, RipeMeasurementError
from server.app.utils.calculations import ntp_precise_time_to_human_date, convert_float_to_precise_time, \
get_non_responding_ntp_measurement
from server.app.utils.ip_utils import get_ip_family, ref_id_to_ip_or_name, get_server_ip, ip_to_str
from server.app.utils.load_config_data import get_ripe_account_email, get_ripe_api_token, get_ntp_version, \
get_timeout_measurement_s, get_ripe_number_of_probes_per_measurement, \
get_ripe_timeout_per_probe_ms, get_ripe_packets_per_probe, get_right_ntp_nts_binary_tool_for_your_os
from server.app.utils.ripe_probes import get_probes
from server.app.utils.domain_name_to_ip import domain_name_to_ip_list
from server.app.dtos.NtpExtraDetails import NtpExtraDetails
from server.app.dtos.NtpMainDetails import NtpMainDetails
from server.app.dtos.NtpMeasurement import NtpMeasurement
from server.app.dtos.NtpServerInfo import NtpServerInfo
from server.app.dtos.NtpTimestamps import NtpTimestamps
from server.app.dtos.PreciseTime import PreciseTime
from server.app.utils.validate import is_ip_address
[docs]
def perform_ntp_measurement_domain_name_list(server_name: str, client_ip: Optional[str] = None, wanted_ip_type: int = 4,
ntp_version: int = get_ntp_version()) -> Optional[list[NtpMeasurement]]:
"""
This method performs a NTP measurement on a NTP server from all the IPs got back from its domain name.
Args:
server_name (str): The name of the ntp server.
client_ip (Optional[str]): The IP address of the client (if given).
wanted_ip_type (int): The IP type that we want to measure.
ntp_version (int): The version of the ntp that you want to use.
Returns:
Optional[list[NtpMeasurement]]: It returns a list of NTP measurement objects or None if there is a timeout.
Raises:
DNSError: If the domain name is invalid or cannot be converted to an IP list.
"""
domain_ips: list[str] = domain_name_to_ip_list(server_name, client_ip, wanted_ip_type)
# domain_ips contains a list of ips that are good to use.
#nts_analysis = perform_nts_measurement_domain_name(server_name, False, wanted_ip_type)
resulted_measurements = []
ok = False
for ip_str in domain_ips:
try:
client = ntplib.NTPClient()
response_from_ntplib = client.request(ip_str, ntp_version, timeout=get_timeout_measurement_s())
r = convert_ntp_response_to_measurement(response=response_from_ntplib,
server_ip_str=ip_str,
server_name=server_name,
ntp_version=ntp_version)
if r is not None:
#r.server_info.ref_name=nts_analysis["NTS analysis"] # for testing to see if it works.
resulted_measurements.append(r)
ok = True
except Exception as e:
print(f"Error in measure from name on ip {ip_str} (this IP failed, maybe others succeeded):", e)
empty_measurement = get_non_responding_ntp_measurement(ip_str, server_name, ntp_version)
resulted_measurements.append(empty_measurement)
continue
return resulted_measurements if ok is True else None
[docs]
def convert_ntp_response_to_measurement(response: ntplib.NTPStats, server_ip_str: str, server_name: Optional[str],
ntp_version: int = get_ntp_version()) -> Optional[NtpMeasurement]:
"""
This method converts an NTP response to an NTP measurement object.
Args:
response (ntplib.NTPStats): The NTP response to convert.
server_ip_str (str): The IP address of the ntp server in string format.
server_name (Optional[str]): The name of the ntp server.
ntp_version (int): The version of the ntp that you want to use.
Returns:
Optional[NtpMeasurement]: It returns an NTP measurement object if the conversion was successful.
"""
try:
vantage_point_ip = None
ip_type = get_ip_family(server_ip_str)
# get the same type (We guaranteed before calling this method that it exists)
vantage_point_ip_temp = get_server_ip(ip_type)
if vantage_point_ip_temp is not None:
vantage_point_ip = vantage_point_ip_temp
ref_ip, ref_name = ref_id_to_ip_or_name(response.ref_id,
response.stratum, get_ip_family(server_ip_str))
server_ip = ip_address(server_ip_str)
server_info: NtpServerInfo = NtpServerInfo(
ntp_version=ntp_version,
ntp_server_ip=server_ip,
ntp_server_name=server_name,
ntp_server_ref_parent_ip=ref_ip,
ref_name=ref_name,
ntp_server_location=ServerLocation(country_code=get_country_for_ip(ip_to_str(server_ip)),
coordinates=get_coordinates_for_ip(ip_to_str(server_ip)))
)
timestamps: NtpTimestamps = NtpTimestamps(
client_sent_time=PreciseTime(ntplib._to_int(response.orig_timestamp),
ntplib._to_frac(response.orig_timestamp)),
server_recv_time=PreciseTime(ntplib._to_int(response.recv_timestamp),
ntplib._to_frac(response.recv_timestamp)),
server_sent_time=PreciseTime(ntplib._to_int(response.tx_timestamp),
ntplib._to_frac(response.tx_timestamp)),
client_recv_time=PreciseTime(ntplib._to_int(response.dest_timestamp),
ntplib._to_frac(response.dest_timestamp))
)
main_details: NtpMainDetails = NtpMainDetails(
offset=response.offset,
rtt=response.delay,
stratum=response.stratum,
precision=response.precision,
reachability=""
)
extra_details: NtpExtraDetails = NtpExtraDetails(
root_delay=convert_float_to_precise_time(response.root_delay),
ntp_last_sync_time=convert_float_to_precise_time(response.ref_timestamp),
leap=response.leap,
poll=response.poll,
root_dispersion=convert_float_to_precise_time(response.root_dispersion)
)
return NtpMeasurement(vantage_point_ip, server_info, timestamps, main_details, extra_details)
except Exception as e:
print("Error in convert response to measurement:", e)
return None
[docs]
def analyze_supported_ntp_versions(server: str, settings: AdvancedSettings) -> dict:
"""
This method analyzes supported NTP versions for the specified server. It will provide an analysis on each NTP version
from NTPv1 to NTPv5 (draft). (no NTS in this method)
Each NTP version analysis will have a "confidence" (0->100) that says how much this server thinks the NTP server supports each versions.
For example, if it truly supports an NTP version, the confidence will be 100.
This method returns either a dictionary with an error (if running the tool fails), or a dictionary with the whole structure (requested_versions*3=15 variables)
with each field (even though some NTP version failed, you will still have all the fields).
Args:
server (str): The server to analyze (domain name or IP address).
settings (AdvancedSettings): The settings to use for the analysis.
Returns:
dict: A dictionary containing an analysis on each NTP versions.
"""
# ntpvX_supported_confidence is from 0% to 100% (X is from 1 to 5)
# 0% means: not supported at all (no response)
# 25% means: received something, but failed to get the data, or invalid format
# 50% means: received an NTP response, but with a different NTP version (honest server)
# 75% means: received an NTP response, correct version, but content seems to be from another NTP version (server may have lied)
# 100% means: fully valid ntpvX response received
ntp_versions_analysis: dict = {}
try:
binary_nts_tool = get_right_ntp_nts_binary_tool_for_your_os()
except Exception as e:
ntp_versions_analysis["error"] = "NTP versions test could not be performed (binary tool not available)."
# This is the case when the tool fails, because python was not able to find it or run it.
return ntp_versions_analysis
if settings.analyse_all_ntp_versions: # more easily and reliable if we want all NTP versions
ntp_versions_analysis = directly_analyze_all_ntp_versions(server, str(binary_nts_tool), settings.ntpv5_draft)
else:
for ntp_version in settings.ntp_versions_to_analyze: # we assume settings has valid input
(ntp_versions_analysis[ntp_version + "_supported_confidence"],
ntp_versions_analysis[ntp_version + "_analysis"],
# settings.ntpv5_draft will be considered if and only if the ntp_version is "ntpv5"
ntp_versions_analysis[ntp_version + "_m_result"]) = run_tool_on_ntp_version(server, str(binary_nts_tool),
ntp_version, settings.ntpv5_draft)
time.sleep(1) # to not get RATE from the server
return ntp_versions_analysis
[docs]
def print_ntp_measurement(measurement: NtpMeasurement) -> bool:
"""
It prints the ntp measurement in a human-readable format and returns True if the printing was successful.
Args:
measurement (NtpMeasurement): The NtpMeasurement object.
"""
try:
print("=== NTP Measurement ===")
print(f"Vantage Point IP: {measurement.vantage_point_ip}")
# Server Info
server = measurement.server_info
print(f"Server Name: {server.ntp_server_name}")
print(f"Server IP: {server.ntp_server_ip}")
print(f"NTP Version: {server.ntp_version}")
print(f"Reference Parent IP: {server.ntp_server_ref_parent_ip}")
print(f"Reference Name (Raw): {server.ref_name}")
# Timestamps
timestamps = measurement.timestamps
print(
f"Client sent time: {ntp_precise_time_to_human_date(timestamps.client_sent_time)} : s: {timestamps.client_sent_time.seconds} f: {timestamps.client_sent_time.fraction}")
print(
f"Server recv time: {ntp_precise_time_to_human_date(timestamps.server_recv_time)} : s: {timestamps.server_recv_time.seconds} f: {timestamps.server_recv_time.fraction}")
print(
f"Server sent time: {ntp_precise_time_to_human_date(timestamps.server_sent_time)} : s: {timestamps.server_sent_time.seconds} f: {timestamps.server_sent_time.fraction}")
print(
f"Client recv time: {ntp_precise_time_to_human_date(timestamps.client_recv_time)} : s: {timestamps.client_recv_time.seconds} f: {timestamps.client_recv_time.fraction}")
# Main Details
main = measurement.main_details
print(f"Offset (s): {main.offset}")
print(f"Delay (s): {main.rtt}")
print(f"Stratum: {main.stratum}")
print(f"Precision: {main.precision}")
print(f"Reachability: {main.reachability}")
# Extra Details
extra = measurement.extra_details
print(f"Root Delay: {ntplib._to_time(extra.root_delay.seconds, extra.root_delay.fraction)}")
print(f"Last Sync Time: {ntp_precise_time_to_human_date(extra.ntp_last_sync_time)}")
print(f"Poll: {extra.poll}")
print(f"Leap: {extra.leap}")
print("=========================")
return True
except Exception as e:
print("Error:", e)
return False
[docs]
def perform_ripe_measurement_domain_name(server_name: str, settings: AdvancedSettings,
probes_requested: int =
get_ripe_number_of_probes_per_measurement()) -> int:
"""
This method performs a RIPE measurement on a domain name. It lets the RIPE atlas probe to
decide which IP of that domain name to use. (You can see this IP from the details of the
measurement by looking at the measurement ID)
Args:
server_name (str): The domain name of the NTP server.
settings (AdvancedSettings): The settings to use.
probes_requested (int): The number of probes requested.
Returns:
int: It returns the ID of the measurement and the list of IPs of the domain name.
You can find in the measurement what IP it used.
Raises:
InputError: If the conversion could not be performed.
RipeMeasurementError: If the ripe measurement could not be performed.
"""
client_ip = settings.custom_client_ip
if probes_requested <= 0:
raise InputError("Probes requested must be greater than 0.")
get_ip_family(client_ip) # throws an error if it is invalid
# we will make an NTP measurement from probes to the domain name.
# measurement settings
# we use wanted_ip_type to force to search this type
headers, request_content = get_request_settings(ntp_server=server_name,
settings=settings, probes_requested=probes_requested)
# perform the measurement
response = requests.post(
"https://atlas.ripe.net/api/v2/measurements/",
headers=headers,
data=json.dumps(request_content)
)
data = response.json()
# the answer has a list of measurements, but we only did one measurement so we send one.
try:
ans: int = data["measurements"][0]
except Exception as e:
if "error" in data:
raise RipeMeasurementError(data["error"])
else:
raise RipeMeasurementError(f"Ripe measurement failed:{e}")
return ans
[docs]
def get_request_settings(ntp_server: str, settings: AdvancedSettings,
probes_requested: int = get_ripe_number_of_probes_per_measurement()) -> tuple[dict, dict]:
"""
This method gets the RIPE measurement settings for the performing a RIPE measurement.
Args:
ntp_server (str): The NTP server IP address or domain name.
settings (AdvancedSettings): The settings to use, including client_ip, and The IP family of the NTP server. (4 or 6))
probes_requested (int): The number of probes requested.
Returns:
tuple[dict, dict]: Returns the RIPE measurement settings for the performing a RIPE measurement.
Raises:
InputError: If the input is invalid.
RipeMeasurementError: If the ripe measurement could not be performed.
ValueError: If some variable in env is not correctly set.
"""
headers = {
"Authorization": f"Key {get_ripe_api_token()}",
"Content-Type": "application/json"
}
request_content = {"definitions": [
{
"type": "ntp",
"af": settings.wanted_ip_type,
"resolve_on_probe": True,
"description": f"NTP measurement to {ntp_server}",
"packets": get_ripe_packets_per_probe(),
"timeout": get_ripe_timeout_per_probe_ms(),
"skip_dns_check": False,
"target": ntp_server
}
],
"is_oneoff": True,
"bill_to": get_ripe_account_email(),
"probes": get_probes(settings, probes_requested) # we want probes close to the client
}
return headers, request_content
# example to see how you use them
# s=AdvancedSettings()
# s.analyse_all_ntp_versions=True
# # s.ntp_versions_to_analyze=["ntpv1", "ntpv2", "ntpv3", "ntpv4", "ntpv5"]
# s.ntpv5_draft="draft-ietf-ntp-ntpv5-06"
# pprint.pprint(analyze_supported_ntp_versions("time.google.com",s))
# import threading
#
# def run_tool():
# s = AdvancedSettings()
# s.analyse_all_ntp_versions = True
# # s.ntp_versions_to_analyze=["ntpv1", "ntpv2", "ntpv3", "ntpv4", "ntpv5"]
# s.ntpv5_draft = "draft-ietf-ntp-ntpv5-06"
# pprint.pprint(analyze_supported_ntp_versions("time.google.com", s))
#
# threading.Thread(target=run_tool).start()
# print_ntp_measurement(perform_ntp_measurement_domain_name_list("time.apple.com", "5a01:c741:a16:4000::1f2", 6, 4)[0])
# print_ntp_measurement(perform_ntp_measurement_domain_name_list("time.cloudflare.com", "17.253.6.45", 4,4)[0])
# print_ntp_measurement(perform_ntp_measurement_domain_name_list("ntpd-rs.sidnlabs.nl", None, 4,5)[0])
# print_ntp_measurement(perform_ntp_measurement_domain_name_list("time.apple.com", "17.253.6.45", 6,4)[0])
# print(perform_ripe_measurement_ip("2a01:b740:a16:4000::1f2","2a01:c741:a16:4000::1f2", 12))
# print(perform_ripe_measurement_domain_name("time.apple.com","83.25.24.10", 6, 15))