PyCon Italia 2019
Internet delle cose con Redis e django-channels

Development of vertical software solutions for medium to large businesses
Intranet, extranet, web services and IOT
Focused on technologies such as Python, Django, Redis, WebSockets, HTMX, IoT.
Real-time data acquisition, from Arduino to the web, using PubSub with Redis, Django and other friends
A Django app which provides advanced integration for a Django project with the jQuery Javascript library DataTables.net, when used in server-side processing mode.
Demo site
A Django helper app to add editing capabilities to the frontend using modal forms.
Demo site
A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq
A collection of tools to trace, analyze and render Querysets
Helper used to remove oldest records from specific db tables in a Django project
This script searches for a specific filename (or substring) across all snapshots in a Restic repository. It retrieves the list of snapshots in JSON format, sorts them in descending chronological order (newest first), and then scans each snapshot for paths matching the provided search term (case?insensitive).
For each snapshot where a match is found, the script prints:
Use case:
file search_restic_file.bash
#!/bin/bash if ! command -v jq &> /dev/null; then echo "? jq is required but not installed. Install it with: apt install jq" exit 1 fi if [ -z "$1" ]; then echo "Usage: $0 filename" exit 1 fi SEARCH_TERM="$1" echo "?? Searching for \"$SEARCH_TERM\" in Restic snapshots (newest first)..." # Get JSON once JSON=$(restic snapshots --json) || { echo "? Failed to get snapshots"; exit 1; } # Extract sorted short_id and time together # Output format: "<short_id> <time>" SORTED=$(echo "$JSON" | jq -r 'sort_by(.time) | reverse | .[] | "\(.short_id) \(.time)"') if [ -z "$SORTED" ]; then echo "? No snapshots found." exit 1 fi # Loop through each line: first column is ID, rest is time while read -r ID TIME; do [ -z "$ID" ] && continue MATCHES=$(restic ls "$ID" 2>/dev/null | grep -iF "$SEARCH_TERM") if [ -n "$MATCHES" ]; then echo "? Found in snapshot $ID (time: $TIME):" echo "$MATCHES" echo "--------------------------------" fi done <<< "$SORTED"
This script searches for a specific filename (or substring) across all snapshots in a Restic repository. It retrieves the list of snapshots in JSON format, sorts them in descending chronological order (newest first), and then scans each snapshot for paths matching the provided search term (case?insensitive).
For each snapshot where a match is found, the script prints:
Use case:
file search_restic_file.bash
#!/bin/bash if ! command -v jq &> /dev/null; then echo "? jq is required but not installed. Install it with: apt install jq" exit 1 fi if [ -z "$1" ]; then echo "Usage: $0 filename" exit 1 fi SEARCH_TERM="$1" echo "?? Searching for \"$SEARCH_TERM\" in Restic snapshots (newest first)..." # Get JSON once JSON=$(restic snapshots --json) || { echo "? Failed to get snapshots"; exit 1; } # Extract sorted short_id and time together # Output format: "<short_id> <time>" SORTED=$(echo "$JSON" | jq -r 'sort_by(.time) | reverse | .[] | "\(.short_id) \(.time)"') if [ -z "$SORTED" ]; then echo "? No snapshots found." exit 1 fi # Loop through each line: first column is ID, rest is time while read -r ID TIME; do [ -z "$ID" ] && continue MATCHES=$(restic ls "$ID" 2>/dev/null | grep -iF "$SEARCH_TERM") if [ -n "$MATCHES" ]; then echo "? Found in snapshot $ID (time: $TIME):" echo "$MATCHES" echo "--------------------------------" fi done <<< "$SORTED"
Save into '/usr/local/bin/testmail', then:
chmod +x /usr/local/bin/testmail
#!/usr/bin/env python3 import os import configparser from pathlib import Path import argparse def run_command(command, dry_run): print("\x1b[1;37;40m" + command + "\x1b[0m") if not dry_run: os.system(command) class Config(): def __init__(self): self._config = configparser.ConfigParser() if not self.get_config_filename().exists(): self._config['general'] = { 'counter': 1, } self.save_config() print(f"Default config written to {self.get_config_filename()}") self.read_config() def get_config_filename(self): return Path(__file__).parent / "testmail.ini" def read_config(self): self._config.read(self.get_config_filename()) def save_config(self): with open(self.get_config_filename(), 'w') as f: self._config.write(f) def get_counter(self, increment=True): value = self._config.getint('general', 'counter') if increment: self.increment_counter() return value def increment_counter(self): self._config['general']['counter'] = str(self.get_counter(increment=False) + 1) self.save_config() def reset_counter(self): self._config['general']['counter'] = '1' self.save_config() def get_section(self, section): section = self._config[section] return dict(section.items()) def build_swaks_command(counter, config, recipient): """ swaks --to you@yourdomain.com \ --from smtp@helpdesk.brainstorm.it \ --server 127.0.0.1 \ --auth LOGIN \ --auth-user smtp@helpdesk.brainstorm.it \ --auth-password e67ca7ef5c85e56a35e7a22b6be4b2d5 \ --port 587 Sample section in config file: [swaks] from = "Brainstorm helpdesk <smtp@helpdesk.brainstorm.it>" server = 127.0.0.1 auth = LOGIN auth-user = smtp@helpdesk.brainstorm.it auth-password = ********************** port = 587 """ section = config.get_section('swaks') command = f"swaks --to {recipient}" command += f' --header "Subject: Test mail {counter}"' for key, value in section.items(): command += f" --{key} {value}" return command def build_mail_command(counter, recipient): command = 'echo "Test mail %d; created on: `date`" | mail -s "Test mail %d" %s' % ( counter, counter, recipient, ) return command if __name__ == '__main__': parser = argparse.ArgumentParser(description="Sends a test email to the specified recipient") parser.add_argument('recipient') parser.add_argument('--use-swaks', '-s', action='store_true') parser.add_argument('--dry-run', '-d', action='store_true') args = parser.parse_args() config = Config() counter = config.get_counter(increment=not args.dry_run) if args.use_swaks: command = build_swaks_command(counter, config, args.recipient) else: command = build_mail_command(counter, args.recipient) run_command(command, args.dry_run)
Save into '/usr/local/bin/testmail', then:
chmod +x /usr/local/bin/testmail
#!/usr/bin/env python3 import os import configparser from pathlib import Path import argparse def run_command(command, dry_run): print("\x1b[1;37;40m" + command + "\x1b[0m") if not dry_run: os.system(command) class Config(): def __init__(self): self._config = configparser.ConfigParser() if not self.get_config_filename().exists(): self._config['general'] = { 'counter': 1, } self.save_config() print(f"Default config written to {self.get_config_filename()}") self.read_config() def get_config_filename(self): return Path(__file__).parent / "testmail.ini" def read_config(self): self._config.read(self.get_config_filename()) def save_config(self): with open(self.get_config_filename(), 'w') as f: self._config.write(f) def get_counter(self, increment=True): value = self._config.getint('general', 'counter') if increment: self.increment_counter() return value def increment_counter(self): self._config['general']['counter'] = str(self.get_counter(increment=False) + 1) self.save_config() def reset_counter(self): self._config['general']['counter'] = '1' self.save_config() def get_section(self, section): section = self._config[section] return dict(section.items()) def build_swaks_command(counter, config, recipient): """ swaks --to you@yourdomain.com \ --from smtp@helpdesk.brainstorm.it \ --server 127.0.0.1 \ --auth LOGIN \ --auth-user smtp@helpdesk.brainstorm.it \ --auth-password e67ca7ef5c85e56a35e7a22b6be4b2d5 \ --port 587 Sample section in config file: [swaks] from = "Brainstorm helpdesk <smtp@helpdesk.brainstorm.it>" server = 127.0.0.1 auth = LOGIN auth-user = smtp@helpdesk.brainstorm.it auth-password = ********************** port = 587 """ section = config.get_section('swaks') command = f"swaks --to {recipient}" command += f' --header "Subject: Test mail {counter}"' for key, value in section.items(): command += f" --{key} {value}" return command def build_mail_command(counter, recipient): command = 'echo "Test mail %d; created on: `date`" | mail -s "Test mail %d" %s' % ( counter, counter, recipient, ) return command if __name__ == '__main__': parser = argparse.ArgumentParser(description="Sends a test email to the specified recipient") parser.add_argument('recipient') parser.add_argument('--use-swaks', '-s', action='store_true') parser.add_argument('--dry-run', '-d', action='store_true') args = parser.parse_args() config = Config() counter = config.get_counter(increment=not args.dry_run) if args.use_swaks: command = build_swaks_command(counter, config, args.recipient) else: command = build_mail_command(counter, args.recipient) run_command(command, args.dry_run)
My implementation:
import csv from django.db import models from django.db import connection from django.utils import timezone from django.db import connection from django.contrib.humanize.templatetags.humanize import intcomma from ccms_lib.backend.models import CODE_MAX_LENGTH class SqlViewMixin(): @classmethod def create_view(cls, verbose=False): view_name = cls._meta.db_table cls_sql = cls.sql().strip() if cls_sql.endswith(';'): cls_sql = cls_sql[:-1] if cls.materialized: sql = f"CREATE materialized VIEW IF NOT EXISTS {view_name} AS " sql += cls_sql + ';\n' pk_name = cls._meta.pk.name sql += f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{view_name} ON {view_name}({pk_name});' else: sql = f"CREATE VIEW {view_name} AS " sql += cls_sql + ';\n' cls.execute(sql, verbose) @classmethod def drop_view(cls, verbose=False): view_name = cls._meta.db_table sql = "DROP %sVIEW IF EXISTS %s CASCADE;\n" % ( 'materialized ' if cls.materialized else '', view_name, ) pk_name = cls._meta.pk.name sql += f"DROP INDEX IF EXISTS idx_{view_name};" cls.execute(sql, verbose) @classmethod def refresh_view(cls, concurrently=False, verbose=False): if cls.materialized: #sql = "REFRESH MATERIALIZED VIEW CONCURRENTLY {cls._meta.db_table};" sql = "REFRESH MATERIALIZED VIEW %s%s;" % ( "CONCURRENTLY " if concurrently else '', cls._meta.db_table, ) cls.execute(sql, verbose) @classmethod def execute(cls, sql, verbose): with connection.cursor() as cursor: if verbose: print(sql) cursor.execute(sql) @classmethod def export_view_as_csv(cls, verbose, filename=None, delimiter=','): def export_rows(cursor, sql, offset, page_size, writer, verbose): if page_size > 0: sql = sql + " OFFSET %d LIMIT %d" % (offset, page_size) if verbose: print(sql) cursor.execute(sql) if offset <= 0: writer.writerow([f.name for f in cursor.description]) for row in cursor.fetchall(): writer.writerow(row) view_name = cls._meta.db_table if filename is None: filename = timezone.now().strftime('%Y-%m-%d_%H-%M-%S__') + view_name + '.csv' #page_size = 100000 # 0 = no pagination page_size = 1000000 # 0 = no pagination sql = str(cls.objects.all().query) sql += ' ORDER BY "%s"' % cls._meta.pk.name n = 0 step = 0 if verbose: n = cls.objects.count() step = int(n / 100) print('Exporting file "%s"; records: %s' % (filename, intcomma(n))) with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, dialect='excel', delimiter=delimiter) with connection.cursor() as cursor: if page_size <= 0: # no pagination cursor.execute(sql) writer.writerow([f.name for f in cursor.description]) j = 0 row = cursor.fetchone() while row is not None: j += 1 if verbose and j==1 or ((j/10) % step) == 0: progress = int((j * 100) / n) + 1 print('%d%% (%s/%s)' % (progress, intcomma(j), intcomma(n))) writer.writerow(row) row = cursor.fetchone() else: # paginate num_pages = (n // page_size) + (1 if n % page_size else 0) offset = 0 j = 0 while offset < n: if verbose: progress = int(((j*page_size) * 100) / n) print('page %d/%d (%d%%)' % (j+1, num_pages, progress)) export_rows(cursor, sql, offset, page_size, writer, verbose=False) offset += page_size j += 1 ################################################################################ # Example ... class DwhBase(SqlViewMixin, models.Model): materialized = True id = models.CharField(null=False, max_length=256, primary_key=True) base_code = models.CharField(max_length=CODE_MAX_LENGTH, null=False) base_name = models.CharField(max_length=256, null=False) class Meta: managed = False db_table = "dwhm_base" def __str__(self): return str(self.id) @staticmethod def sql(): return """ SELECT code AS id, code as base_code, description AS base_name FROM backend_base; """
References:
My implementation:
import csv from django.db import models from django.db import connection from django.utils import timezone from django.db import connection from django.contrib.humanize.templatetags.humanize import intcomma from ccms_lib.backend.models import CODE_MAX_LENGTH class SqlViewMixin(): @classmethod def create_view(cls, verbose=False): view_name = cls._meta.db_table cls_sql = cls.sql().strip() if cls_sql.endswith(';'): cls_sql = cls_sql[:-1] if cls.materialized: sql = f"CREATE materialized VIEW IF NOT EXISTS {view_name} AS " sql += cls_sql + ';\n' pk_name = cls._meta.pk.name sql += f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{view_name} ON {view_name}({pk_name});' else: sql = f"CREATE VIEW {view_name} AS " sql += cls_sql + ';\n' cls.execute(sql, verbose) @classmethod def drop_view(cls, verbose=False): view_name = cls._meta.db_table sql = "DROP %sVIEW IF EXISTS %s CASCADE;\n" % ( 'materialized ' if cls.materialized else '', view_name, ) pk_name = cls._meta.pk.name sql += f"DROP INDEX IF EXISTS idx_{view_name};" cls.execute(sql, verbose) @classmethod def refresh_view(cls, concurrently=False, verbose=False): if cls.materialized: #sql = "REFRESH MATERIALIZED VIEW CONCURRENTLY {cls._meta.db_table};" sql = "REFRESH MATERIALIZED VIEW %s%s;" % ( "CONCURRENTLY " if concurrently else '', cls._meta.db_table, ) cls.execute(sql, verbose) @classmethod def execute(cls, sql, verbose): with connection.cursor() as cursor: if verbose: print(sql) cursor.execute(sql) @classmethod def export_view_as_csv(cls, verbose, filename=None, delimiter=','): def export_rows(cursor, sql, offset, page_size, writer, verbose): if page_size > 0: sql = sql + " OFFSET %d LIMIT %d" % (offset, page_size) if verbose: print(sql) cursor.execute(sql) if offset <= 0: writer.writerow([f.name for f in cursor.description]) for row in cursor.fetchall(): writer.writerow(row) view_name = cls._meta.db_table if filename is None: filename = timezone.now().strftime('%Y-%m-%d_%H-%M-%S__') + view_name + '.csv' #page_size = 100000 # 0 = no pagination page_size = 1000000 # 0 = no pagination sql = str(cls.objects.all().query) sql += ' ORDER BY "%s"' % cls._meta.pk.name n = 0 step = 0 if verbose: n = cls.objects.count() step = int(n / 100) print('Exporting file "%s"; records: %s' % (filename, intcomma(n))) with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, dialect='excel', delimiter=delimiter) with connection.cursor() as cursor: if page_size <= 0: # no pagination cursor.execute(sql) writer.writerow([f.name for f in cursor.description]) j = 0 row = cursor.fetchone() while row is not None: j += 1 if verbose and j==1 or ((j/10) % step) == 0: progress = int((j * 100) / n) + 1 print('%d%% (%s/%s)' % (progress, intcomma(j), intcomma(n))) writer.writerow(row) row = cursor.fetchone() else: # paginate num_pages = (n // page_size) + (1 if n % page_size else 0) offset = 0 j = 0 while offset < n: if verbose: progress = int(((j*page_size) * 100) / n) print('page %d/%d (%d%%)' % (j+1, num_pages, progress)) export_rows(cursor, sql, offset, page_size, writer, verbose=False) offset += page_size j += 1 ################################################################################ # Example ... class DwhBase(SqlViewMixin, models.Model): materialized = True id = models.CharField(null=False, max_length=256, primary_key=True) base_code = models.CharField(max_length=CODE_MAX_LENGTH, null=False) base_name = models.CharField(max_length=256, null=False) class Meta: managed = False db_table = "dwhm_base" def __str__(self): return str(self.id) @staticmethod def sql(): return """ SELECT code AS id, code as base_code, description AS base_name FROM backend_base; """
References:
.
pip install rpdb
import rpdb; rpdb.set_trace() or rpdb.set_trace(addr=('0.0.0.0', 4444))
and finally debug with:
nc <target_ip> 4444
.
pip install rpdb
import rpdb; rpdb.set_trace() or rpdb.set_trace(addr=('0.0.0.0', 4444))
and finally debug with:
nc <target_ip> 4444
file "export_gitlab_issues.py"
#!/usr/bin/env python3 import pprint import json import gitlab import argparse import markdown # Requirements # python-gitlab==4.11.1 # Markdown-3.7 GITLAB_URL = "https://gitlab.brainstorm.it" GITLAB_PRIVATE_TOKEN = "glpat-******************" GITLAB_PROJECT = 77 PAGE_BREAK_STRING = '<div style="page-break-after: always;"></div>' class GitlabClient(gitlab.Gitlab): def __init__(self, project_id): super().__init__(GITLAB_URL, GITLAB_PRIVATE_TOKEN, order_by='created_at') self.project = self.projects.get(project_id) def retrieve_issue(self, issue_id): issue = self.project.issues.get(issue_id) base_url = issue.web_url # Es: 'https://gitlab.brainstorm.it/group/project/-/issues/36' position = base_url.find('/-/') if position >= 0: base_url = base_url[:position] # Es: 'https://gitlab.brainstorm.it/group/project' data = json.loads(issue.to_json()) self.fix_image_links(data, base_url) data['notes'] = [] notes = issue.notes.list(all=True) notes = sorted([n for n in notes if not n.system], key=lambda k: k.created_at) for n in notes: note = json.loads(n.to_json()) self.fix_image_links(note, base_url) data['notes'].append(note) return data def fix_image_links(self, data, base_url): text = data.get('description', data.get('body')) text = text.replace('/uploads/', base_url+'/uploads/') if 'description' in data: data['description'] = text else: data['body'] = text def to_markdown(data): text = "" for k, v in data.items(): text += '\n# [%d] %s\n\n' % (k, v['title']) text += "### %s (%s)\n\n" % (v['author']['name'], v['created_at']) text += "**Link**: %s\n\n" % v['web_url'] text += v['description'] text += "\n\n" for n in v['notes']: text += "\n\n" + (80*'-') + "\n\n" text += "### %s (%s)\n\n" % (n['author']['name'], v['created_at']) text += n['body'] text += "\n\n" text += "\n\n" + PAGE_BREAK_STRING return text def markdown_to_html(md_text): extensions = [ "tables", "fenced_code", "codehilite", "toc" ] #safe_mode = True html = markdown.markdown( md_text, extensions=extensions, # safe_mode=safe_mode, # enable_attributes=(not safe_mode), ) return html def markdown_to_pdf(md_file, pdf_file, verbose=False): # Read Markdown file with open(md_file, "r", encoding="utf-8") as file: md_text = file.read() html_text = markdown_to_html(md_text) return html_to_pdf(html_text, pdf_file, verbose=False) def html_to_pdf(html_text, pdf_file, verbose=False): from weasyprint import HTML from pygments.formatters import HtmlFormatter # Generate Pygments CSS for styling pygments_css = HtmlFormatter().get_style_defs('.codehilite') # Custom CSS for better styling css = """ body { font-family: Arial, sans-serif; padding: 20px; } pre { background: #f4f4f4; padding: 10px; border-radius: 5px; overflow-x: auto; font-size: 10px; } code { font-family: monospace; } img { display: block; max-width: 98%; height: auto; margin: 0 auto; border: 2px solid #ccc; } """ + pygments_css # Wrap HTML with styling html = f""" <html> <head> <style>{css}</style> </head> <body> {html_text} </body> </html>""" # Convert HTML to PDF if verbose: print(html) print('build "%s" ...' % pdf_file) HTML(string=html).write_pdf(pdf_file) print('done') def main(): parser = argparse.ArgumentParser(description="...") parser.add_argument('issue_ids', nargs="+", type=int) parser.add_argument("--project_id", "-p", type=int, default=GITLAB_PROJECT, help="Default: %d" % GITLAB_PROJECT) parser.add_argument("--format", type=str, choices=["", "json", "markdown", "html", "pdf", ], default="") parser.add_argument("--filename", type=str, default="result.pdf", help='filename when required (i.e.: PDF format); default: "result.pdf"') args = parser.parse_args() project_id = args.project_id client = GitlabClient(GITLAB_PROJECT) data = {} for issue_id in args.issue_ids: issue_data = client.retrieve_issue(issue_id) data[issue_id] = issue_data if args.format == 'json': print(json.dumps(data, indent=4)) if args.format == 'markdown': print(to_markdown(data)) elif args.format == 'html': md = to_markdown(data) html = markdown_to_html(md) print(html) elif args.format == 'pdf': md = to_markdown(data) html = markdown_to_html(md) html_to_pdf(html, args.filename, verbose=False) else: pprint.pprint(data) if __name__ == '__main__': main()
file "export_gitlab_issues.py"
#!/usr/bin/env python3 import pprint import json import gitlab import argparse import markdown # Requirements # python-gitlab==4.11.1 # Markdown-3.7 GITLAB_URL = "https://gitlab.brainstorm.it" GITLAB_PRIVATE_TOKEN = "glpat-******************" GITLAB_PROJECT = 77 PAGE_BREAK_STRING = '<div style="page-break-after: always;"></div>' class GitlabClient(gitlab.Gitlab): def __init__(self, project_id): super().__init__(GITLAB_URL, GITLAB_PRIVATE_TOKEN, order_by='created_at') self.project = self.projects.get(project_id) def retrieve_issue(self, issue_id): issue = self.project.issues.get(issue_id) base_url = issue.web_url # Es: 'https://gitlab.brainstorm.it/group/project/-/issues/36' position = base_url.find('/-/') if position >= 0: base_url = base_url[:position] # Es: 'https://gitlab.brainstorm.it/group/project' data = json.loads(issue.to_json()) self.fix_image_links(data, base_url) data['notes'] = [] notes = issue.notes.list(all=True) notes = sorted([n for n in notes if not n.system], key=lambda k: k.created_at) for n in notes: note = json.loads(n.to_json()) self.fix_image_links(note, base_url) data['notes'].append(note) return data def fix_image_links(self, data, base_url): text = data.get('description', data.get('body')) text = text.replace('/uploads/', base_url+'/uploads/') if 'description' in data: data['description'] = text else: data['body'] = text def to_markdown(data): text = "" for k, v in data.items(): text += '\n# [%d] %s\n\n' % (k, v['title']) text += "### %s (%s)\n\n" % (v['author']['name'], v['created_at']) text += "**Link**: %s\n\n" % v['web_url'] text += v['description'] text += "\n\n" for n in v['notes']: text += "\n\n" + (80*'-') + "\n\n" text += "### %s (%s)\n\n" % (n['author']['name'], v['created_at']) text += n['body'] text += "\n\n" text += "\n\n" + PAGE_BREAK_STRING return text def markdown_to_html(md_text): extensions = [ "tables", "fenced_code", "codehilite", "toc" ] #safe_mode = True html = markdown.markdown( md_text, extensions=extensions, # safe_mode=safe_mode, # enable_attributes=(not safe_mode), ) return html def markdown_to_pdf(md_file, pdf_file, verbose=False): # Read Markdown file with open(md_file, "r", encoding="utf-8") as file: md_text = file.read() html_text = markdown_to_html(md_text) return html_to_pdf(html_text, pdf_file, verbose=False) def html_to_pdf(html_text, pdf_file, verbose=False): from weasyprint import HTML from pygments.formatters import HtmlFormatter # Generate Pygments CSS for styling pygments_css = HtmlFormatter().get_style_defs('.codehilite') # Custom CSS for better styling css = """ body { font-family: Arial, sans-serif; padding: 20px; } pre { background: #f4f4f4; padding: 10px; border-radius: 5px; overflow-x: auto; font-size: 10px; } code { font-family: monospace; } img { display: block; max-width: 98%; height: auto; margin: 0 auto; border: 2px solid #ccc; } """ + pygments_css # Wrap HTML with styling html = f""" <html> <head> <style>{css}</style> </head> <body> {html_text} </body> </html>""" # Convert HTML to PDF if verbose: print(html) print('build "%s" ...' % pdf_file) HTML(string=html).write_pdf(pdf_file) print('done') def main(): parser = argparse.ArgumentParser(description="...") parser.add_argument('issue_ids', ="+", type=int) parser.add_argument("--project_id", "-p", type=int, default=GITLAB_PROJECT, help="Default: %d" % GITLAB_PROJECT) parser.add_argument("--format", type=str, choices=["", "json", "markdown", "html", "pdf", ], default="") parser.add_argument("--filename", type=str, default="result.pdf", help='filename when required (i.e.: PDF format); default: "result.pdf"') args = parser.parse_args() project_id = args.project_id client = GitlabClient(GITLAB_PROJECT) data = {} for issue_id in args.issue_ids: issue_data = client.retrieve_issue(issue_id) data[issue_id] = issue_data if args.format == 'json': print(json.dumps(data, indent=4)) if args.format == 'markdown': print(to_markdown(data)) elif args.format == 'html': md = to_markdown(data) html = markdown_to_html(md) print(html) elif args.format == 'pdf': md = to_markdown(data) html = markdown_to_html(md) html_to_pdf(html, args.filename, verbose=False) else: pprint.pprint(data) if __name__ == '__main__': main()
Using requests:
import requests # https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests#16696317 def download_file(url): local_filename = url.split('/')[-1] # NOTE the stream=True parameter below with requests.get(url, stream=True) as r: r.raise_for_status() with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. #if chunk: f.write(chunk) return local_filename
Using urlib.request:
import urllib.request def download_file2(url): """ Use urlib.request instead of requests to avoid "403 Forbidden" errors when the remote server tries to prevent scraping (for example when is protected behind cloudflare) """ request = urllib.request.Request(url) request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:106.0) Gecko/20100101 Firefox/106.0') request.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8') request.add_header('Accept-Language', 'en-US,en;q=0.5') response = urllib.request.urlopen(request) local_filename = url.split('/')[-1] chunk_size = 8192 with open(local_filename, 'wb') as f: size = 0 while True: info = response.read(chunk_size) if len(info) < 1: break size = size + len(info) f.write(info) return local_filename
Using requests:
import requests # https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests#16696317 def download_file(url): local_filename = url.split('/')[-1] # NOTE the stream=True parameter below with requests.get(url, stream=True) as r: r.raise_for_status() with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. #if chunk: f.write(chunk) return local_filename
Using urlib.request:
import urllib.request def download_file2(url): """ Use urlib.request instead of requests to avoid "403 Forbidden" errors when the remote server tries to prevent scraping (for example when is protected behind cloudflare) """ request = urllib.request.Request(url) request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:106.0) Gecko/20100101 Firefox/106.0') request.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8') request.add_header('Accept-Language', 'en-US,en;q=0.5') response = urllib.request.urlopen(request) local_filename = url.split('/')[-1] chunk_size = 8192 with open(local_filename, 'wb') as f: size = 0 while True: info = response.read(chunk_size) if len(info) < 1: break size = size + len(info) f.write(info) return local_filename
Ensure that each tenant has its own unique subdomain. For example, tenant1.yourblog.com should only show content specific to Tenant 1.
Use Django’s canonical URLs to avoid duplicate content issues across tenants.
Generate separate sitemaps for each tenant to help search engines crawl their content.
Include dynamic meta descriptions that are specific to each tenant’s content.
Allow or disallow search engines from indexing tenants based on your requirements.
References:
Ensure that each tenant has its own unique subdomain. For example, tenant1.yourblog.com should only show content specific to Tenant 1.
Use Django’s canonical URLs to avoid duplicate content issues across tenants.
Generate separate sitemaps for each tenant to help search engines crawl their content.
Include dynamic meta descriptions that are specific to each tenant’s content.
Allow or disallow search engines from indexing tenants based on your requirements.
References:
from faker import Faker faker = Faker() names = dir(faker) for name in names: try: value = getattr(faker, name)() text = str(value)[:40] print('%-30.30s %s' % (name, text)) except: pass
from faker import Faker faker = Faker() names = dir(faker) for name in names: try: value = getattr(faker, name)() text = str(value)[:40] print('%-30.30s %s' % (name, text)) except: pass
file "list_promethues_metrics_and_labels.py"
#!/usr/bin/env python3 import requests import json import argparse PROM_URL = "http://localhost:9090" def list_metrics_with_labels(): data = {} r = requests.get(f"{PROM_URL}/api/v1/label/__name__/values") metric_names = r.json()["data"] for name in sorted(metric_names): r2 = requests.get(f"{PROM_URL}/api/v1/series", params={"match[]": name}) series = r2.json()["data"] label_set = set() for s in series: label_set.update(s.keys()) label_set.discard("__name__") data[name] = list(label_set) return data if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-f', '--filter', type=str, help="filter keys") parser.add_argument('-m', '--metrics-only', action='store_true', help="List only metrics (exclude labels)") parser.add_argument('-i', '--indent', type=int, default=4, help="No indent if < 0") args = parser.parse_args() data = list_metrics_with_labels() if args.filter: data = { k: v for k, v in data.items() if args.filter in k } indent = args.indent if indent < 0: indent = None if args.metrics_only: print(json.dumps(list(data.keys()), indent=indent)) else: print(json.dumps(data, indent=indent))
file "list_promethues_metrics_and_labels.py"
#!/usr/bin/env python3 import requests import json import argparse PROM_URL = "http://localhost:9090" def list_metrics_with_labels(): data = {} r = requests.get(f"{PROM_URL}/api/v1/label/__name__/values") metric_names = r.json()["data"] for name in sorted(metric_names): r2 = requests.get(f"{PROM_URL}/api/v1/series", params={"match[]": name}) series = r2.json()["data"] label_set = set() for s in series: label_set.update(s.keys()) label_set.discard("__name__") data[name] = list(label_set) return data if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-f', '--filter', type=str, help="filter keys") parser.add_argument('-m', '--metrics-only', action='store_true', help="List only metrics (exclude labels)") parser.add_argument('-i', '--indent', type=int, default=4, help="No indent if < 0") args = parser.parse_args() data = list_metrics_with_labels() if args.filter: data = { k: v for k, v in data.items() if args.filter in k } indent = args.indent if indent < 0: indent = None if args.metrics_only: print(json.dumps(list(data.keys()), indent=indent)) else: print(json.dumps(data, indent=indent))
Save into '/usr/local/bin/monitor_mail', then:
chmod +x /usr/local/bin/monitor_mail
#!/usr/bin/env python3 import time import re import argparse # Strong styles for primary fields STATUS_STYLES = { 'sent': '\033[97;42m', # white on green 'bounced': '\033[97;41m', # white on red 'deferred': '\033[30;44m', # black on blue 'expired': '\033[97;45m', # white on magenta 'reject': '\033[97;41m', # white on red 'discarded': '\033[30;47m', # black on white 'hold': '\033[30;46m', # black on cyan 'error': '\033[97;41m', # white on red } DEFAULT_STYLE = '\033[30;43m' # black on yellow SECONDARY_STYLE = '\033[94m' # light blue RESET = '\033[0m' def highlight_status_and_to(line, use_color=True): """ Highlights: - 'status=...' and 'to=<...>' with background color - 'from=...', 'relay=...', 'message-id=...' with light foreground color Skips orig_to. """ status_match = re.search(r'status=([a-zA-Z0-9_]+)', line) if not status_match: return None status_value = status_match.group(1) if use_color: strong_style = STATUS_STYLES.get(status_value, DEFAULT_STYLE) secondary_style = SECONDARY_STYLE else: strong_style = '' secondary_style = '' reset = RESET if use_color else '' # Primary highlights line = re.sub( r'status=([a-zA-Z0-9_]+)', lambda m: f"{strong_style}{m.group(0)}{reset}", line ) line = re.sub( r'(?<!orig_)to=<[^>]+>', lambda m: f"{strong_style}{m.group(0)}{reset}", line ) # Secondary highlights line = re.sub( r'from=<[^>]+>', lambda m: f"{secondary_style}{m.(0)}{reset}", line ) line = re.sub( r'relay=[^ ,]+', lambda m: f"{secondary_style}{m.group(0)}{reset}", line ) line = re.sub( r'message-id=<[^>]+>', lambda m: f"{secondary_style}{m.group(0)}{reset}", line ) return line def tail_f(filepath, show_all=False, from_start=False, use_color=True, only_status=None): """ Tail -f with optional filtering, coloring and status filtering. """ with open(filepath, 'r') as f: if not from_start: f.seek(0, 2) while True: line = f.readline() if not line: time.sleep(0.1) continue line = line.rstrip() # Extract status value (if any) status_match = re.search(r'status=([a-zA-Z0-9_]+)', line) status_value = status_match.group(1) if status_match else None if only_status and status_value != only_status: continue # Skip line if it doesn't match the --only-status filter highlighted = highlight_status_and_to(line, use_color=use_color) if highlighted: print(highlighted) elif show_all: print(line) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Tail and highlight Postfix log lines") parser.add_argument( "--show-all", action="store_true", help="Show all lines (not just those with 'status=...')" ) parser.add_argument( "--from-start", action="store_true", help="Start reading from the beginning of the file instead of the end" ) parser.add_argument( "--no-color", action="store_true", help="Disable all ANSI color output (monochrome)" ) parser.add_argument( "--only-status", metavar="STATUS", help="Show only lines with a specific status (e.g. bounced, sent, deferred)" ) args = parser.parse_args() # Default path to Postfix log log_file_path = "/var/log/mail.log" # Start tailing tail_f( filepath=log_file_path, show_all=args.show_all, from_start=args.from_start, use_color=not args.no_color, only_status=args.only_status )
Save into '/usr/local/bin/monitor_mail', then:
chmod +x /usr/local/bin/monitor_mail
#!/usr/bin/env python3 import time import re import argparse # Strong styles for primary fields STATUS_STYLES = { 'sent': '\033[97;42m', # white on green 'bounced': '\033[97;41m', # white on red 'deferred': '\033[30;44m', # black on blue 'expired': '\033[97;45m', # white on magenta 'reject': '\033[97;41m', # white on red 'discarded': '\033[30;47m', # black on white 'hold': '\033[30;46m', # black on cyan 'error': '\033[97;41m', # white on red } DEFAULT_STYLE = '\033[30;43m' # black on yellow SECONDARY_STYLE = '\033[94m' # light blue RESET = '\033[0m' def highlight_status_and_to(line, use_color=True): """ Highlights: - 'status=...' and 'to=<...>' with background color - 'from=...', 'relay=...', 'message-id=...' with light foreground color Skips orig_to. """ status_match = re.search(r'status=([a-zA-Z0-9_]+)', line) if not status_match: return None status_value = status_match.group(1) if use_color: strong_style = STATUS_STYLES.get(status_value, DEFAULT_STYLE) secondary_style = SECONDARY_STYLE else: strong_style = '' secondary_style = '' reset = RESET if use_color else '' # Primary highlights line = re.sub( r'status=([a-zA-Z0-9_]+)', lambda m: f"{strong_style}{m.group(0)}{reset}", line ) line = re.sub( r'(?<!orig_)to=<[^>]+>', lambda m: f"{strong_style}{m.group(0)}{reset}", line ) # Secondary highlights line = re.sub( r'from=<[^>]+>', lambda m: f"{secondary_style}{m.group(0)}{reset}", line ) line = re.sub( r'relay=[^ ,]+', lambda m: f"{secondary_style}{m.group(0)}{reset}", line ) line = re.sub( r'message-id=<[^>]+>', lambda m: f"{secondary_style}{m.group(0)}{reset}", line ) return line def tail_f(filepath, show_all=False, from_start=False, use_color=True, only_status=None): """ Tail -f with optional filtering, coloring and status filtering. """ with open(filepath, 'r') as f: if not from_start: f.seek(0, 2) while True: line = f.readline() if not line: time.sleep(0.1) continue line = line.rstrip() # Extract status value (if any) status_match = re.search(r'status=([a-zA-Z0-9_]+)', line) status_value = status_match.group(1) if status_match else None if only_status and status_value != only_status: continue # Skip line if it doesn't match the --only-status filter highlighted = highlight_status_and_to(line, use_color=use_color) if highlighted: print(highlighted) elif show_all: print(line) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Tail and highlight Postfix log lines") parser.add_argument( "--show-all", action="store_true", help="Show all lines (not just those with 'status=...')" ) parser.add_argument( "--from-start", action="store_true", help="Start reading from the beginning of the file instead of the end" ) parser.add_argument( "--no-color", action="store_true", help="Disable all ANSI color output (monochrome)" ) parser.add_argument( "--only-status", metavar="STATUS", help="Show only lines with a specific status (e.g. bounced, sent, deferred)" ) args = parser.parse_args() # Default path to Postfix log log_file_path = "/var/log/mail.log" # Start tailing tail_f( filepath=log_file_path, show_all=args.show_all, from_start=args.from_start, use_color=not args.no_color, only_status=args.only_status )
Realizzato allo scopo di verificare il codice utilizzato internamente nelle nostre applicazioni per l'accesso ai databases mediante l'interfaccia ADO, consente di eseguire query interattive su un generico database (MySQL, Access, SQL Server, Oracle, ecc...). E' inoltre possibile esportare la struttura e il contentuto di un generico database in forma di script SQL compatibile con MySQL oppure SQL Server.
Click here to downloadConsente di inviare dati ASCII oppure binari ad un generico dispositivo, sia utilizzando la comunicazione seriale, che via ethernet (TCP). E' inoltre possibile impostare risposte automatiche per simulare un generico protocollo di comunicazione. Supporta connessioni seriali e/o Ethernet, TCP o UDP (sia lato client che lato server), HTTP, e il protocollo seriale XModem.
Click here to downloadUtilizzato per la distribuzione dei sorgenti in forma crittata. La chiave di decodifica viene fornita singolarmente a ciascun cliente.
Click here to downloadBRAINSTORM S.r.l. di Mario Orlandi & C.
Viale Crispi, 2
41121 Modena, Italy
P.IVA 02409140361
Codice SDI: USAL8PV
Indirizzo Pec: brainstormsnc@pec.it
Codice ATECO 2025: 62.10.00
Numero REA: MO - 300003
Tel: (+39) 059 216138
Insert your credentials to access protected services