Comprehensive Open Source Intelligence Collection & Analysis Protocol
Systematic approach to open source intelligence gathering, verification, and analysis for strategic decision-making.
Establish clear intelligence requirements, define scope, and identify key information needs.
Identify and categorise relevant open-source information channels and platforms.
Execute systematic collection across identified sources using appropriate tools and techniques.
Validate accuracy and reliability of collected information through multiple sources.
Transform raw information into actionable intelligence through systematic analysis.
Communicate findings in formats tailored to specific audiences and decision-making needs.
# OSINT Web Scraping Framework
import requests
from bs4 import BeautifulSoup
import time
import random
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
import json
import hashlib
class OSINTCollector:
def __init__(self):
self.session = requests.Session()
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
self.collected_data = []
self.seen_hashes = set() # Deduplication
self.robots_cache = {}
def _check_robots_txt(self, url):
"""Verify collection is permitted by robots.txt"""
domain = urlparse(url).netloc
if domain not in self.robots_cache:
rp = RobotFileParser()
rp.set_url(f"https://{domain}/robots.txt")
try:
rp.read()
self.robots_cache[domain] = rp
except:
self.robots_cache[domain] = None
if self.robots_cache[domain]:
return self.robots_cache[domain].can_fetch("*", url)
return True
def _calculate_content_hash(self, data):
"""Generate hash for deduplication"""
content_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(content_str.encode()).hexdigest()
def collect_from_source(self, url, selectors):
"""
Collect structured data from web sources with validation
"""
# Verify robots.txt compliance
if not self._check_robots_txt(url):
print(f"Collection not permitted by robots.txt: {url}")
return None
try:
# Rotate User-Agent for anti-detection
self.session.headers.update({
'User-Agent': random.choice(self.user_agents)
})
# Respectful scraping with randomized delays
time.sleep(random.uniform(2, 5))
response = self.session.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Extract data based on CSS selectors with fallback handling
data = {}
for field, selector in selectors.items():
elements = soup.select(selector)
if not elements and 'fallback' in selectors:
# Try fallback selector if primary fails
elements = soup.select(selectors['fallback'].get(field, ''))
data[field] = [elem.get_text(strip=True) for elem in elements]
# Validate extraction success
if all(len(v) == 0 for v in data.values()):
print(f"Warning: No data extracted from {url} - selectors may need updating")
return None
# Add metadata for provenance tracking
data['source_url'] = url
data['collection_timestamp'] = time.time()
data['domain'] = urlparse(url).netloc
data['response_code'] = response.status_code
# Deduplication check
content_hash = self._calculate_content_hash(data)
if content_hash in self.seen_hashes:
print(f"Duplicate content detected from {url}")
return None
self.seen_hashes.add(content_hash)
self.collected_data.append(data)
return data
except requests.exceptions.Timeout:
print(f"Timeout collecting from {url}")
return None
except requests.exceptions.HTTPError as e:
print(f"HTTP error {e.response.status_code} from {url}")
return None
except Exception as e:
print(f"Error collecting from {url}: {str(e)}")
return None
# Usage Example
collector = OSINTCollector()
# CSS selectors with fallback options for resilience
news_selectors = {
'headlines': 'h1, h2.headline',
'articles': '.article-content p',
'authors': '.author-name',
'timestamps': '.publish-date',
'fallback': {
'headlines': 'article h1, .post-title',
'timestamps': 'time, .date'
}
}
# Collect from multiple sources with error handling
sources = ['https://example-news.com/geopolitics']
for source in sources:
result = collector.collect_from_source(source, news_selectors)
if result:
print(f"Successfully collected from {source}")
This OSINT methodology has been successfully implemented across various intelligence disciplines and operational contexts.
Applied to cyber threat hunting, actor attribution, and campaign tracking across multiple threat landscapes.
Enhanced due diligence investigations, competitive intelligence, and supply chain risk assessment.
Regional stability monitoring, conflict analysis, and strategic warning intelligence production.
AML/KYC investigations, sanctions screening, and regulatory compliance verification.