···354354 return s
355355356356357357+def _safe_urlparse(url: str) -> Optional[ParseResult]:
358358+ """Safely parse a URL, returning None for malformed URLs (e.g., invalid IPv6)."""
359359+ try:
360360+ return urlparse(url)
361361+ except ValueError:
362362+ # urlparse raises ValueError for malformed URLs like invalid IPv6 addresses
363363+ return None
364364+365365+357366class StringExtractDomains(UDFBase[StringArguments, List[str]]):
358367 """
359368 Used to extract a list of potential URL domains from a string of tokens. Returns a list
···366375 def execute(self, execution_context: ExecutionContext, arguments: StringArguments) -> List[str]:
367376 # split the message into individual tokens as based on a modified URL regex from messages_common.
368377 # should capture space based links and markdown based links without duplication.
369369- potential_urls: Iterator[ParseResult] = (
370370- urlparse(token) for token in re.findall('(https?:\/\/[^\/\s][^\s\)>]+)', arguments.s)
378378+ potential_urls: Iterator[Optional[ParseResult]] = (
379379+ _safe_urlparse(token) for token in re.findall('(https?:\/\/[^\/\s][^\s\)>]+)', arguments.s)
371380 )
372381373373- # filter out any tokens that do not have a scheme or a domain
374374- valid_domains: Set[str] = set(url.netloc.split(':')[0] for url in potential_urls if url.scheme and url.netloc)
382382+ # filter out any tokens that do not have a scheme or a domain (or failed to parse)
383383+ def extract_host(netloc: str) -> str:
384384+ # IPv6 addresses are enclosed in brackets, e.g. [::1]:8080
385385+ if netloc.startswith('['):
386386+ bracket_end = netloc.find(']')
387387+ if bracket_end != -1:
388388+ return netloc[: bracket_end + 1]
389389+ # Regular hostname:port - split on colon to strip port
390390+ return netloc.split(':')[0]
391391+392392+ valid_domains: Set[str] = set(
393393+ extract_host(url.netloc) for url in potential_urls if url is not None and url.scheme and url.netloc
394394+ )
375395376396 # return any valid domains encountered in the message
377397 return list(valid_domains)
···389409 def execute(self, execution_context: ExecutionContext, arguments: StringArguments) -> List[str]:
390410 # split the message into individual tokens as based on a modified URL regex from messages_common.
391411 # should capture space based links and markdown based links without duplication.
392392- potential_urls: Iterator[ParseResult] = (
393393- urlparse(token) for token in re.findall('(https?:\/\/[^\/\s][^\s\)>]+)', arguments.s)
412412+ potential_urls: Iterator[Optional[ParseResult]] = (
413413+ _safe_urlparse(token) for token in re.findall('(https?:\/\/[^\/\s][^\s\)>]+)', arguments.s)
394414 )
395415396396- # filter out any tokens that do not have a scheme or a domain
416416+ # filter out any tokens that do not have a scheme or a domain (or failed to parse)
397417 valid_urls: Set[str] = set(
398398- urlunparse(parsed_url) for parsed_url in potential_urls if parsed_url.scheme and parsed_url.netloc
418418+ urlunparse(parsed_url)
419419+ for parsed_url in potential_urls
420420+ if parsed_url is not None and parsed_url.scheme and parsed_url.netloc
399421 )
400422401423 # return any valid urls encountered in the message