unitas-misc/net-proxy/squid-opnsense-fetchacls/files/fetchACLs-github-download.patch
2025-03-21 19:08:46 +01:00

146 lines
7.4 KiB
Diff

--- squid-opnsense-fetchacls-24.7.12.py 2025-02-03 09:16:20.922603218 +0100
+++ fetchACLs.py 2025-02-03 09:38:51.971220869 +0100
@@ -54,7 +54,7 @@
""" Download helper
"""
- def __init__(self, url,username, password, timeout, ssl_no_verify=False):
+ def __init__(self, url,username, password, timeout, acl_list, ssl_no_verify=False):
""" init new
:param url: source url
:param timeout: timeout in seconds
@@ -65,6 +65,7 @@
self._username = username
self._password = password
self._ssl_no_verify = ssl_no_verify
+ self._acl_list = acl_list
def fetch(self):
""" fetch (raw) source data into tempfile using self._source_handle
@@ -115,37 +116,24 @@
def get_files(self):
""" process downloaded data, handle compression
:return: iterator filename, file handle
+ zip-function is written for github-blocklists (ZIP-file including mutiple directories with actual blocklists
+ e.g. ut1-blocklists-master/blocklists/adult/domains
"""
if self._source_handle is not None:
- # handle compressed data
- if (len(self._url) > 8 and self._url[-7:] == '.tar.gz') \
- or (len(self._url) > 4 and self._url[-4:] == '.tgz'):
- # source is in tar.gz format, extract all into a single string
- try:
- tf = tarfile.open(fileobj=self._source_handle)
- for tf_file in tf.getmembers():
- if tf_file.isfile():
- yield tf_file.name, tf.extractfile(tf_file)
- except IOError as e:
- syslog.syslog(syslog.LOG_ERR, 'proxy acl: error downloading %s (%s)' % (self._url, e))
- elif len(self._url) > 4 and self._url[-3:] == '.gz':
- # source is in .gz format unpack
- try:
- gf = gzip.GzipFile(mode='r', fileobj=self._source_handle)
- yield os.path.basename(self._url), gf
- except IOError as e:
- syslog.syslog(syslog.LOG_ERR, 'proxy acl: error downloading %s (%s)' % (self._url, e))
- elif len(self._url) > 5 and self._url[-4:] == '.zip':
- # source is in .zip format, extract all into a single string
- with zipfile.ZipFile(self._source_handle,
- mode='r',
- compression=zipfile.ZIP_DEFLATED) as zf:
- for item in zf.infolist():
- if item.file_size > 0:
- yield item.filename, zf.open(item)
- else:
- yield os.path.basename(self._url), self._source_handle
-
+ if len(self._url) > 5 and self._url[-4:] == '.zip':
+ with zipfile.ZipFile(self._source_handle, mode='r') as zip:
+ for item in zip.infolist():
+ if item.filename.startswith('ut1-blacklists-master/blacklists/'):
+ if check_filter(self, item.filename):
+ if "domains" in item.filename: #zip contains absolute paths: only if path starts with ...blocklists and ends with domain
+ with zip.open(item.filename) as file: #every domain-file is opened
+ print(item.filename)
+ content = file.read()
+ content = content.decode('utf-8', errors='ignore') #file-content is parsed into variable and gets decoded to utf-8
+ yield item.filename, content
+ elif (len(self._url) > 8 and self._url[-7:] == '.tar.gz') or (len(self._url) > 4 and self._url[-4:] == '.tgz'):
+ tar = tarfile.open(fileobj=self._source_handle)
+ yield from extract_tar(self, tar)
def download(self):
""" download / unpack ACL
:return: iterator filename, type, content
@@ -154,12 +142,40 @@
for filename, filehandle in self.get_files():
basefilename = os.path.basename(filename).lower()
file_ext = filename.split('.')[-1].lower()
- while True:
- line = filehandle.readline().decode(encoding='utf-8', errors='ignore')
- if not line:
- break
- yield filename, basefilename, file_ext, line
+ for line in filehandle.splitlines():
+ line = line.strip()
+ if line:
+ yield filename, basefilename, file_ext, line
+
+def check_filter(obj, filename):
+ acl_list = obj._acl_list
+ domain = filename.split('/')[-2].lower() #start from end of array and get second last element
+ if len(acl_list) > 0:
+ if domain in acl_list:
+ return True
+ else:
+ return False
+ else:
+ return True
+def extract_tar(obj, tar_file, parent_dir=''):
+ for tf_file in tar_file.getmembers():
+ file_name = tf_file.name
+ if tf_file.isfile() and (file_name.endswith('.tar.gz') or file_name.endswith('.tgz')):
+ try:
+ inner_file = tar_file.extractfile(tf_file)
+ inner_tar = tarfile.open(fileobj=inner_file)
+ yield from extract_tar(obj, inner_tar, parent_dir + tf_file.name + '/')
+ except Exception as e:
+ syslog.syslog(syslog.LOG_ERR, 'proxy acl: error downloading or extracting tarball: %s (%s)' % (obj._url, e))
+ elif tf_file.isfile() and not tf_file.name.endswith("."):
+ if "domains" in tf_file.name:
+ if check_filter(obj, tf_file.name):
+ print(tf_file.name)
+ content = tar_file.extractfile(tf_file).read().decode('utf-8', errors='ignore')
+ yield tf_file.name, content
+ else:
+ continue
class DomainSorter(object):
""" Helper class for building sorted squid domain acl list.
@@ -320,7 +336,7 @@
sslNoVerify = True
else:
sslNoVerify = False
- acl = Downloader(download_url, download_username, download_password, acl_max_timeout, sslNoVerify)
+ acl = Downloader(download_url, download_username, download_password, acl_max_timeout, acl_filters, sslNoVerify)
all_filenames = list()
for filename, basefilename, file_ext, line in acl.download():
if filename_in_ignorelist(basefilename, file_ext):
@@ -338,16 +354,6 @@
if filename not in all_filenames:
all_filenames.append(filename)
- if len(acl_filters) > 0:
- acl_found = False
- for acl_filter in acl_filters:
- if acl_filter in filename:
- acl_found = True
- break
- if not acl_found:
- # skip this acl entry
- continue
-
if filetype in targets and targets[filetype]['handle'] is None:
targets[filetype]['handle'] = targets[filetype]['class'](targets[filetype]['filename'])
if filetype in targets: