Skip to content

Python 文件操作

1. 打开LOG文件, 然后 封禁top N ip

Logging

使用logging 模块记录日志, 既可以在文件中记录, 也可以在控制台输出

regex

使用regex 模块匹配ip地址

ipaddress 模块

Python 3 自带的 ipaddress 模块不仅仅能提取 IP,最重要的是它能验证 IP 的合法性。re 只能检查格式,但 ipaddress 能告诉你 999.999.999.999 是错误的 IP。

思路: 先用简单的字符串分割(split)拿到疑似 IP 的部分,再交给该模块校验。

优点: 严谨,能自动区分 IPv4 和 IPv6,支持子网计算。

import ipaddress

def extract_valid_ip(raw_string):
    # 假设从日志里 split 出来了一个片段
    potential_ip = raw_string.strip()
    try:
        # 如果不是合法 IP,这里会抛出异常
        ip = ipaddress.ip_address(potential_ip)
        return str(ip)
    except ValueError:
        return None

with open

使用with open 打开文件, 然后 逐行读取文件内容,

collections.Counter

subprocess

subprocess 模块 , 可以用python 调用系统命令行指令(比如 Linux 的 iptables)。

import re
from collections import Counter
import logging
import os

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("ip_blocker.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger("IP_Bocker")



def get_top_ips(log_file_path, top_n = 5):

    ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
    ip_counter = Counter()

    if not os.path.exists(log_file_path):
        logger.error(f"Log file not found: {log_file_path}")
        return

    try:
        with open(log_file_path, 'r', encoding='utf-8') as file:
            for line in file:
                match = ip_pattern.search(line)
                if match:
                    ip_counter[match.group()] += 1

        return ip_counter.most_common(top_n)
    except FileNotFoundError:
        logger.error(f"Error: The file {log_file_path} does not exist.")
        return []
    except PermissionError:
        logger.error(f"Error: Permission denied to read {log_file_path}.")
        return []
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
        return []

import subprocess

def block_ip(ip):
    check_cmd = ["iptables", "-C", "INPUT", "-s", ip, "-j", "DROP"]

    result = subprocess.run(check_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    if result.returncode == 0:
        logger.info(f"IP {ip} already in blacklist, skip")
        return

    block_cmd = ["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"]

    try:

        subprocess.run(block_cmd, check=True)
        logger.info(f"Block IP: {ip}")

    except subprocess.CalledProcessError as e:
        logger.error(f"Block {ip} failed, maybe need sudo?")
    except FileNotFoundError:
        logger.error("Not found iptables cmd, check ur system")

def main():
    log_file = "access.log"
    threshold = 100

    logger.info("===== Start to scan =====")
    top_ips = get_top_ips(log_file_path=log_file, top_n=threshold)

    if not top_ips:
        logger.info("No access to data.")
        return

    for ip, count in top_ips:
        if count > threshold:
            logger.warning(f"Found blaacklist ip {ip} (request count: {count}), start blocking")
            block_ip(ip)
        else:
            logger.info(f"IP: {ip}")
import re
from collections import Counter
import logging
import os

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("ip_blocker.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger("IP_Bocker")



def get_top_ips(log_file_path, top_n = 5):

    ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
    ip_counter = Counter()

    if not os.path.exists(log_file_path):
        logger.error(f"Log file not found: {log_file_path}")
        return

    try:
        with open(log_file_path, 'r', encoding='utf-8') as file:
            for line in file:
                match = ip_pattern.search(line)
                if match:
                    ip_counter[match.group()] += 1

        return ip_counter.most_common(top_n)
    except FileNotFoundError:
        logger.error(f"Error: The file {log_file_path} does not exist.")
        return []
    except PermissionError:
        logger.error(f"Error: Permission denied to read {log_file_path}.")
        return []
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
        return []

import subprocess

def block_ip(ip):
    check_cmd = ["iptables", "-C", "INPUT", "-s", ip, "-j", "DROP"]

    result = subprocess.run(check_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    if result.returncode == 0:
        logger.info(f"IP {ip} already in blacklist, skip")
        return

    block_cmd = ["iptablles", "-I", "INPUT", "-s", ip, "-j", "DROP"]

    try:

        subprocess.run(block_cmd, check=True)
        logger.info(f"Block IP: {ip}")

    except subprocess.CalledProcessError as e:
        logger.error(f"Block {IP} failed, maybe need sudo?")
    except FileNotFoundError:
        logger.error("Not found iptables cmd, check ur system")

def main():
    log_file = "access.log"
    threshold = 100

    logger.info("===== Start to scan =====")
    top_ips = get_top_ips(log_file_path=log_file, top_n=threshold)

    if not top_ips:
        logger.info("No access to data.")
        return

    for ip, count in top_ips:
        if count > threshold:
            logger.warning(f"Found blaacklist ip {ip} (request count: {count}), start blocking")
            block_ip(ip)
        else:
            logger.info(f"IP: {ip}")