Source code for fruafr.log.logtosyslog

#!/usr/bin/env python
# pylint: disable=line-too-long
"""
CLI - Save a log message to a log file

No output is written to the console by default (otherwise, use verbose)

"""
# Copyright 2023 by David Heurtevent.
# SPDX_LICENSE: MIT
# License: MIT License
# Author: David HEURTEVENT <david@heurtevent.org>

import argparse
import logging
from logging import handlers
import sys
import socket

from fruafr.log.lib import common
from fruafr.log import logtoconsole

# Defaults
DEFAULT_LOG_FILE = '/tmp/python-tinysyslogserver.log'
LEVEL = 'info'
SEP = ' - '
OPTSEP = SEP
DEFAULT_SYSLOG_ADDR = '/dev/log'
DEFAULT_SYSLOG_PORT = '514'


[docs] class Console(logtoconsole.Console): """Class Console Parses the command line arguments Extends/Replaces logtoconsole.Console """
[docs] def parse_args(self, args) -> argparse.Namespace: """Parse the arguments from the command line Args: args (list): the list of arguments from the command line Returns: the argparse.Namespace object containing the parsed arguments """ parser = argparse.ArgumentParser( prog = 'CLI - Send a log message to a syslog server\n', description = 'Options after --nolevel modify the content of the message before it is logged.', epilog = 'No output is written to the console by default (to override, use -v or --verbose)') # arguments parser.add_argument('message', help='The message to log. Ignored if -MES or --message is specified (please set it to . to avoid error messsages)') parser.add_argument('-L', '--level', dest='level', choices=['debug', 'info', 'warning', 'error', 'critical'], default=f"{LEVEL}", help=f"The level [debug, info, warning, error, critical]. [Default: {LEVEL}]") parser.add_argument('-a', '--addr', dest='addr', help=f"syslog address. can be localhost or a DNS name or an IP address or a string. [Default:{DEFAULT_SYSLOG_ADDR}]", default=f"{DEFAULT_SYSLOG_ADDR}") parser.add_argument('-p', '--port', dest='port', default=f"{DEFAULT_SYSLOG_PORT}", help=f"syslog port [Defauls: {DEFAULT_SYSLOG_PORT}]") parser.add_argument('-F', '--facility', dest='facility', default='user', help='syslog facility [Default: user]') parser.add_argument('-t', '--tcp', dest='tcp', action='store_true', default=False, help='syslog port is tcp [Defauls is udp]') parser.add_argument('-P', '--program', dest='program', default='logtosyslog', help='program [Defauls is logtosyslog]') parser.add_argument('-s', '--sep', dest='sep', help=f"Separator [Default: {SEP}]", default=f"{SEP}") parser.add_argument('-f', '--format', dest='format', help='Python logging format') parser.add_argument('-df', '--dateformat', dest='dateformat', help='Datetime format') parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', default=False, help='Verbose output') parser.add_argument('-d', '--dryrun', action='store_true', dest='dryrun', default=False, help='Dry run - Generate log message on console without sending it over syslog. Used for debugging') # Additional flags parser.add_argument('--noasctime', dest='noasctime', action='store_true', default=False, help='Removes asctime') parser.add_argument('--nolevel', dest='nolevel', action='store_true', default=False, help='Removes level') # Additional arguments that can be passed to append to the message beforehand parser.add_argument('-o', '--options', dest='options', help='order of optional fields to template (comma separated string). Values not in this option are ignored even when provided to the cli') parser.add_argument('-osep', '--optsep', dest='optsep', help=f"Separator for the additional options [Default: {OPTSEP}]", default=f"{OPTSEP}") for option in common.CLI_OPTIONS.items(): parser.add_argument( option[1]['short_option'], option[1]['long_option'], dest = option[1]['dest'], help = option[1]['help'], default = option[1]['default'] ) # return return parser.parse_args(args)
def _prepare_sys_logger(self, addr: str, facility: int, fmt: str, datefmt: str, socktype: int = socket.SOCK_DGRAM, port: str = DEFAULT_SYSLOG_PORT, ) -> logging.Logger: """Prepares the file logger Args: addr (str): host or address in the /var/log format facility (int): facility value found in common.SYSLOG_FACILITIES fmt (str): the template format datefmt (str): the date format socktype (int): socket type [defaults to socket.SOCK_DGRAM] port (str): the port [defaults to DEFAULT_SYSLOG_PORT (normally 514)] Returns: The logger instance """ # get the root logger logger = logging.getLogger('') # set the level of the root logger logger.setLevel(logging.DEBUG) # set the formatter formatter = logging.Formatter(fmt, datefmt) # handle address: if addr[0] == '/': address = addr else: address = (addr, int(port)) # create the syslog handler syslogh = handlers.SysLogHandler(address, facility, socktype) # set formatter syslogh.setFormatter(formatter) # set level syslogh.setLevel(logging.DEBUG) # add handler logger.addHandler(syslogh) # return logger return logger def _determine_facility(self, args:argparse.Namespace) -> int: """Determine the facility from the given arguments Args: args (argparse.Namespace): the CLI arguments Returns: int: the facility integer """ k = common.SYSLOG_FACILITIES.keys() if not args.facility in k: facilities = ','.join(k) print(f"You must specify a facility from the following list: {facilities}", file=sys.stderr) sys.exit(1) facility = getattr(handlers.SysLogHandler, common.SYSLOG_FACILITIES[str(args.facility)]) return facility def _determine_socktype(self, args:argparse.Namespace) -> int: """Determine the type of socket from the given arguments Args: args (argparse.Namespace): the CLI arguments Returns: int: the socket integer """ if args.tcp : socktype = socket.SOCK_STREAM else: socktype = socket.SOCK_DGRAM return socktype
[docs] def process(self, args: argparse.Namespace): """Process the command line arguments Args: args (argparser.Namespace): Command line arguments """ if args.addr is not None and args.port is None: raise ValueError("if you provide a remote or localhost address, --port must be specified") # determine the list of command line options to template in the message part # of the log record if args.options is None: options = self._determine_list_options_to_template(args) else: options = self._validate_list_options_to_template(args.options) # prepare the variables if options is not None and options != []: variables = self._prepare_variables_to_render(args) message = self._apply_template(options, variables, args.optsep) else: message = args.message # determine the format fmt = self._prepare_fmt(args) # determine the date format date_format = self._prepare_date_format(args) # determine the levelint levelint = self._prepare_level(args) # create the sys logger and obtain it facility = self._determine_facility(args) socktype = self._determine_socktype(args) # create the syslogger if not dry run if not args.dryrun: logger = self._prepare_sys_logger(args.addr, facility, fmt, date_format, socktype, args.port) if args.verbose: # create the logger and obtain it logger = self._prepare_console_logger(fmt, date_format) # log the message new_message = f"{args.program} {message}" logger.log(levelint, new_message)
[docs] def main(): """Main : CLI logic""" # parse arguments args = Console().parse_args(sys.argv[1:]) # process arguments Console().process(args)
if __name__ == "__main__": main()