"""Functions related to CLI for PSLab.
Example
-------
>>> from pslab import cli
>>> parser, subparser = cli.get_parser()
>>> cli.add_collect_args(subparser)
>>> cli.add_wave_args(subparser)
>>> cli.add_pwm_args(subparser)
>>> parser.parse_args(["collect","-i","logic_analyzer"])
Namespace(channels=1, duration=1, file_path=None, function='collect',
instrument='logic_analyzer', json=False, port=None)
"""
import argparse
import csv
import json
import platform
import os.path
import shutil
import sys
import time
from itertools import zip_longest
from typing import List, Tuple
import numpy as np
import pslab
import pslab.protocol as CP
from pslab.instrument.logic_analyzer import LogicAnalyzer
from pslab.instrument.oscilloscope import Oscilloscope
from pslab.instrument.waveform_generator import WaveformGenerator, PWMGenerator
from pslab.serial_handler import SerialHandler
[docs]def logic_analyzer(
device: SerialHandler, channels: int, duration: float
) -> Tuple[List[str], List[np.ndarray]]:
"""Capture logic events on up to four channels simultaneously.
Parameters
----------
device : :class:`Handler`
Serial interface for communicating with the PSLab device.
channels : {1, 2, 3, 4}
Number of channels to capture events on. Events will be captured on LA1,
LA2, LA3, and LA4, in that order.
duration : float
Duration in seconds up to which events will be captured.
Returns
-------
list of str
Name of active channels.
list of numpy.ndarray
List of numpy.ndarrays holding timestamps in microseconds when logic events
were detected. The length of the list is equal to the number of channels
that were used to capture events, and each list element corresponds to a
channel.
Warnings
--------
This cannot be used at the same time as the oscilloscope.
"""
la = LogicAnalyzer(device)
la.capture(channels, block=False)
time.sleep(duration)
la.stop()
timestamps = la.fetch_data()
channels_name = [la._channel_one_map, la._channel_two_map, "LA3", "LA4"]
return channels_name[:channels], timestamps
[docs]def oscilloscope(
device: SerialHandler, channels: int, duration: float
) -> Tuple[List[str], List[np.ndarray]]:
"""Capture varying voltage signals on up to four channels simultaneously.
Parameters
----------
device : :class:`Handler`
Serial interface for communicating with the PSLab device.
channels : {1, 2, 4}
Number of channels to sample from simultaneously. By default, samples are
captured from CH1, CH2, CH3 and MIC.
duration : float
Duration in seconds up to which samples will be captured.
Returns
-------
list of str
"Timestamp", Name of active channels.
list of numpy.ndarray
List of numpy.ndarrays with timestamps in the first index and corresponding
voltages in the following index. The length of the list is equal to one
additional to the number of channels that were used to capture samples.
"""
scope = Oscilloscope(device)
max_samples = CP.MAX_SAMPLES // channels
min_timegap = scope._lookup_mininum_timegap(channels)
max_duration = max_samples * min_timegap * 1e-6
active_channels = ([scope._channel_one_map] + scope._CH234)[:channels]
xy = [np.array([]) for _ in range(1 + channels)]
while duration > 0:
if duration >= max_duration:
samples = max_samples
else:
samples = round((duration * 1e6) / min_timegap)
st = time.time()
xy = np.append(xy, scope.capture(channels, samples, min_timegap), axis=1)
duration -= time.time() - st
return ["Timestamp"] + active_channels, xy
INSTRUMENTS = {
"logic_analyzer": logic_analyzer,
"oscilloscope": oscilloscope,
}
[docs]def collect(handler: SerialHandler, args: argparse.Namespace):
"""Collect data from instruments, and write it in file or stdout.
Parameters
----------
handler : :class:`Handler`
Serial interface for communicating with the PSLab device.
args : :class:`argparse.Namespace`
Parsed arguments.
Raises
------
LookupError
If the given instrument not available.
"""
instrument = INSTRUMENTS.get(args.instrument)
if instrument is None:
raise LookupError(args.instrument + " not available")
output = instrument(handler, args.channels, args.duration)
if args.file_path is not None:
file = open(args.file_path, "w")
else:
file = sys.stdout
if not args.json:
csv_file = csv.writer(file)
csv_file.writerow(output[0])
for row in zip_longest(*output[1]):
csv_file.writerow(row)
else:
output_dict = dict()
for key, val in zip_longest(*output):
output_dict[key] = val.tolist()
json.dump(output_dict, file)
if args.file_path is not None:
file.close()
[docs]def wave(handler: SerialHandler, args: argparse.Namespace):
"""Generate or load wave.
Parameters
----------
handler : :class:`Handler`
Serial interface for communicating with the PSLab device.
args : :class:`argparse.Namespace`
Parsed arguments.
"""
waveform_generator = WaveformGenerator(handler)
if args.wave_function == "gen":
waveform_generator.generate(
channels=args.channel,
frequency=args.frequency,
phase=args.phase,
)
elif args.wave_function == "load":
if args.table is not None:
table = args.table
elif args.table_file is not None:
with open(args.table_file) as table_file:
table = json.load(table_file)
x = np.arange(0, len(table), len(table) / 512)
y = [table[int(i)] for i in x]
waveform_generator.load_table(channel=args.channel, points=y)
[docs]def pwm(handler: SerialHandler, args: argparse.Namespace):
"""Generate PWM.
Parameters
----------
handler : :class:`Handler`
Serial interface for communicating with the PSLab device.
args : :class:`argparse.Namespace`
Parsed arguments.
"""
pwm_generator = PWMGenerator(handler)
if args.pwm_function == "gen":
pwm_generator.generate(
channels=args.channel,
frequency=args.frequency,
duty_cycles=args.duty_cycles,
phases=args.phases,
)
elif args.pwm_function == "map":
pwm_generator.map_reference_clock(
channels=args.channel,
prescaler=args.prescaler,
)
[docs]def main(args: argparse.Namespace):
"""Perform the given function on PSLab.
Parameters
----------
args : :class:`argparse.Namespace`
Parsed arguments.
"""
if args.function == "install":
install(args)
return
handler = SerialHandler(port=args.port)
if args.function == "collect":
collect(handler, args)
elif args.function == "wave":
wave(handler, args)
elif args.function == "pwm":
pwm(handler, args)
[docs]def get_parser() -> Tuple[argparse.ArgumentParser, argparse._SubParsersAction]:
"""Parser for CLI.
Returns
-------
parser : :class:`argparse.ArgumentParser`
Arqument parser for CLI.
functions : :class:`argparse._SubParsersAction`
SubParser to add other arguments related to different function.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-p",
"--port",
type=str,
default=None,
required=False,
help="The name of the port to which the PSLab is connected",
)
functions = parser.add_subparsers(
title="Functions", dest="function", description="Functions to perform on PSLab."
)
return parser, functions
[docs]def add_collect_args(subparser: argparse._SubParsersAction):
"""Add arguments for collect function to ArgumentParser.
Parameters
----------
subparser : :class:`argparse._SubParsersAction`
SubParser to add other arguments related to collect function.
"""
description = "Available Instruments: " + ", ".join(INSTRUMENTS) + "."
collect = subparser.add_parser("collect", description=description)
collect.add_argument(
"instrument",
type=str,
help="The name of the instrument to use",
)
collect.add_argument(
"-c",
"--channels",
type=int,
default=1,
required=False,
help="Number of channels to capture",
)
collect.add_argument(
"-d",
"--duration",
type=float,
default=1,
required=False,
help="Duration for capturing (in seconds)",
)
collect.add_argument(
"-o",
"--output",
type=str,
default=None,
required=False,
dest="file_path",
help="File name to write data, otherwise in stdout",
)
collect.add_argument(
"-j",
"--json",
action="store_true",
default=False,
help="Enable it to write data in json format",
)
[docs]def add_wave_args(subparser: argparse._SubParsersAction):
"""Add arguments for wave {gen,load} function to ArgumentParser.
Parameters
----------
subparser : :class:`argparse._SubParsersAction`
SubParser to add other arguments related to wave_gen function.
"""
wave = subparser.add_parser("wave")
wave_functions = wave.add_subparsers(
title="Wave Functions",
dest="wave_function",
)
wave_gen = wave_functions.add_parser("gen")
wave_gen.add_argument(
"channel",
nargs="+",
choices=["SI1", "SI2"],
help="Pin(s) on which to generate a waveform",
)
wave_gen.add_argument(
"-f",
"--frequency",
nargs="+",
type=float,
required=True,
help="Frequency in Hz",
)
wave_gen.add_argument(
"-p",
"--phase",
type=float,
default=0,
required=False,
help="Phase between waveforms in degrees",
)
description = """
TABLE:
JSON array of voltage values which make up the waveform. Array length
must be 512. If the array length less than 512, then the array will be
expanded in length of 512. Values outside the range -3.3 V to 3.3 V
will be clipped.
examples:
[1,0] or [1,...,0,...],
[0,1,0,-1,0,1,0,-1,...],
[0,.025,.05,.075,.1,.125,.15,...]
"""
load = wave_functions.add_parser("load", description=description)
load.add_argument(
"channel",
choices=["SI1", "SI2"],
help="Pin(s) on which to load a table",
)
load_table = load.add_mutually_exclusive_group(required=True)
load_table.add_argument(
"--table",
type=json.loads,
default=None,
help="Table to load in pin SI1 as json",
)
load_table.add_argument(
"--table-file",
nargs="?",
type=str,
const=0,
default=None,
help="Table to load in pin SI1 as json file. Default is stdin",
)
[docs]def add_pwm_args(subparser: argparse._SubParsersAction):
"""Add arguments for pwm {gen,map,set} function to ArgumentParser.
Parameters
----------
subparser : :class:`argparse._SubParsersAction`
SubParser to add other arguments related to pwm_gen function.
"""
pwm = subparser.add_parser("pwm")
pwm_functions = pwm.add_subparsers(
title="PWM Functions",
dest="pwm_function",
)
pwm_gen = pwm_functions.add_parser("gen")
pwm_gen.add_argument(
"channel",
nargs="+",
choices=["SQ1", "SQ2", "SQ3", "SQ4"],
help="Pin(s) on which to generate a PWM signals",
)
pwm_gen.add_argument(
"-f",
"--frequency",
type=float,
required=True,
help="Frequency in Hz. Shared by all outputs",
)
pwm_gen.add_argument(
"-d",
"--duty-cycles",
nargs="+",
type=float,
required=True,
help="Duty cycle between 0 and 1",
)
pwm_gen.add_argument(
"-p",
"--phases",
nargs="+",
type=float,
default=0,
required=False,
help="Phase between 0 and 1",
)
map_ = pwm_functions.add_parser("map")
map_.add_argument(
"channel",
nargs="+",
choices=["SQ1", "SQ2", "SQ3", "SQ4"],
help="Digital output pin(s) to which to map the internal oscillator",
)
map_.add_argument(
"-p",
"--prescaler",
type=int,
required=True,
help="Prescaler value in interval [0, 15]."
+ "The output frequency is 128 / (1 << prescaler) MHz",
)
[docs]def cmdline(args: List[str] = None):
"""Command line for pslab.
Parameters
----------
args : list of strings.
Arguments to parse.
"""
if args is None:
args = sys.argv[1:]
parser, subparser = get_parser()
add_collect_args(subparser)
add_wave_args(subparser)
add_pwm_args(subparser)
add_install_args(subparser)
main(parser.parse_args(args))
[docs]def install(args: argparse.Namespace):
"""Install udev rule on Linux.
Parameters
----------
args : :class:`argparse.Namespace`
Parsed arguments.
"""
if not platform.system() == "Linux":
print(f"Installation not required on {platform.system()}.")
return
else:
try:
SerialHandler.check_serial_access_permission()
except OSError:
_install()
return
if args.force:
_install()
return
print("User is in dialout/uucp group or udev rule is already installed.")
def _install():
udev_rules = os.path.join(pslab.__path__[0], "99-pslab.rules")
target = "/etc/udev/rules.d/99-pslab.rules"
shutil.copyfile(udev_rules, target)
return
[docs]def add_install_args(subparser: argparse._SubParsersAction):
"""Add arguments for install function to ArgumentParser.
Parameters
----------
subparser : :class:`argparse._SubParsersAction`
SubParser to add other arguments related to install function.
"""
install = subparser.add_parser("install")
install.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Overwrite existing udev rule.",
)