Python 文件操作
1. 打开LOG文件, 然后 封禁top N ip
Logging
使用logging 模块记录日志, 既可以在文件中记录, 也可以在控制台输出
regex
使用regex 模块匹配ip地址
ipaddress 模块
Python 3 自带的 ipaddress 模块不仅仅能提取 IP,最重要的是它能验证 IP 的合法性。re 只能检查格式,但 ipaddress 能告诉你 999.999.999.999 是错误的 IP。
思路: 先用简单的字符串分割(split)拿到疑似 IP 的部分,再交给该模块校验。
优点: 严谨,能自动区分 IPv4 和 IPv6,支持子网计算。
import ipaddress
def extract_valid_ip(raw_string):
# 假设从日志里 split 出来了一个片段
potential_ip = raw_string.strip()
try:
# 如果不是合法 IP,这里会抛出异常
ip = ipaddress.ip_address(potential_ip)
return str(ip)
except ValueError:
return None
with open
使用with open 打开文件, 然后 逐行读取文件内容,
collections.Counter
subprocess
subprocess 模块 , 可以用python 调用系统命令行指令(比如 Linux 的 iptables)。
import re
from collections import Counter
import logging
import os
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("ip_blocker.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("IP_Bocker")
def get_top_ips(log_file_path, top_n = 5):
ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
ip_counter = Counter()
if not os.path.exists(log_file_path):
logger.error(f"Log file not found: {log_file_path}")
return
try:
with open(log_file_path, 'r', encoding='utf-8') as file:
for line in file:
match = ip_pattern.search(line)
if match:
ip_counter[match.group()] += 1
return ip_counter.most_common(top_n)
except FileNotFoundError:
logger.error(f"Error: The file {log_file_path} does not exist.")
return []
except PermissionError:
logger.error(f"Error: Permission denied to read {log_file_path}.")
return []
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return []
import subprocess
def block_ip(ip):
check_cmd = ["iptables", "-C", "INPUT", "-s", ip, "-j", "DROP"]
result = subprocess.run(check_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
logger.info(f"IP {ip} already in blacklist, skip")
return
block_cmd = ["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"]
try:
subprocess.run(block_cmd, check=True)
logger.info(f"Block IP: {ip}")
except subprocess.CalledProcessError as e:
logger.error(f"Block {ip} failed, maybe need sudo?")
except FileNotFoundError:
logger.error("Not found iptables cmd, check ur system")
def main():
log_file = "access.log"
threshold = 100
logger.info("===== Start to scan =====")
top_ips = get_top_ips(log_file_path=log_file, top_n=threshold)
if not top_ips:
logger.info("No access to data.")
return
for ip, count in top_ips:
if count > threshold:
logger.warning(f"Found blaacklist ip {ip} (request count: {count}), start blocking")
block_ip(ip)
else:
logger.info(f"IP: {ip}")
import re
from collections import Counter
import logging
import os
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("ip_blocker.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("IP_Bocker")
def get_top_ips(log_file_path, top_n = 5):
ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
ip_counter = Counter()
if not os.path.exists(log_file_path):
logger.error(f"Log file not found: {log_file_path}")
return
try:
with open(log_file_path, 'r', encoding='utf-8') as file:
for line in file:
match = ip_pattern.search(line)
if match:
ip_counter[match.group()] += 1
return ip_counter.most_common(top_n)
except FileNotFoundError:
logger.error(f"Error: The file {log_file_path} does not exist.")
return []
except PermissionError:
logger.error(f"Error: Permission denied to read {log_file_path}.")
return []
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return []
import subprocess
def block_ip(ip):
check_cmd = ["iptables", "-C", "INPUT", "-s", ip, "-j", "DROP"]
result = subprocess.run(check_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
logger.info(f"IP {ip} already in blacklist, skip")
return
block_cmd = ["iptablles", "-I", "INPUT", "-s", ip, "-j", "DROP"]
try:
subprocess.run(block_cmd, check=True)
logger.info(f"Block IP: {ip}")
except subprocess.CalledProcessError as e:
logger.error(f"Block {IP} failed, maybe need sudo?")
except FileNotFoundError:
logger.error("Not found iptables cmd, check ur system")
def main():
log_file = "access.log"
threshold = 100
logger.info("===== Start to scan =====")
top_ips = get_top_ips(log_file_path=log_file, top_n=threshold)
if not top_ips:
logger.info("No access to data.")
return
for ip, count in top_ips:
if count > threshold:
logger.warning(f"Found blaacklist ip {ip} (request count: {count}), start blocking")
block_ip(ip)
else:
logger.info(f"IP: {ip}")