Implement Keysight N1914a plugin.
N1914a is a power meter with SCPI interface. The implementation of
the plugin is referenced by the Keysight official document:
N1913A/N1914A EPM series Power Meter Programming Guide.
BUG=b:27818897
TEST=manually
Change-Id: Ia40db44bc22ad169f8578a56c4e61159eb72a333
diff --git a/graphyte/inst/keysight_n1914a/__init__.py b/graphyte/inst/keysight_n1914a/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/graphyte/inst/keysight_n1914a/__init__.py
diff --git a/graphyte/inst/keysight_n1914a/keysight_n1914a.py b/graphyte/inst/keysight_n1914a/keysight_n1914a.py
new file mode 100644
index 0000000..90173a9
--- /dev/null
+++ b/graphyte/inst/keysight_n1914a/keysight_n1914a.py
@@ -0,0 +1,225 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""The Keysight N1914a power meter.
+
+Reference: Keysight official document
+ N1913A/N1914A EPM series Power Meter Programming Guide.
+"""
+
+from .. import InstBase
+from ...default_setting import logger
+from ...testplan import ChainMaskToList
+
+
+DEFAULT_LINK_OPTIONS = {
+ 'link_class': 'SCPILink',
+ 'host': '10.3.0.60',
+ 'port': 5025
+}
+
+
+class Inst(InstBase):
+ name = 'Keysight N1914a'
+ need_link = True
+
+ def __init__(self, inst_port=1, range_setting='AUTO', avg_count=0,
+ continuous_trigger=False, trigger_source='IMMediate', **kwargs):
+ """Initialize method.
+
+ Args:
+ inst_port: the port of the power meter. 1 or 2.
+ range_setting: 0, 1, or 'AUTO'.
+ avg_count: integer or 'AUTO'.
+ continuous_trigger: enable continuous trigger or not. boolean.
+ trigger_source: the trigger source. string.
+ """
+ kwargs.setdefault('link_options', {})
+ for key, value in DEFAULT_LINK_OPTIONS.items():
+ kwargs['link_options'].setdefault(key, value)
+ super(Inst, self).__init__(**kwargs)
+ self.current_pathloss = None
+
+ assert inst_port in [1, 2]
+ assert range_setting in [0, 1, 'AUTO']
+ assert isinstance(avg_count, int) or avg_count == 'AUTO'
+
+ self.inst_port = inst_port
+ self.range_setting = range_setting
+ self.avg_count = avg_count
+ self.continuous_trigger = continuous_trigger
+ self.trigger_source = trigger_source
+
+ self.controllers = {
+ 'WLAN': RFController(self),
+ 'BLUETOOTH': RFController(self),
+ '802_15_4': RFController(self)}
+
+ def Initialize(self):
+ logger.info('Inst Initialize')
+ self.link.CheckReady()
+ # Choose SCPI language
+ self.SendCommand('SYSTem:LANGuage SCPI')
+ self.SendCommand('*RST')
+
+ # Set range and average filter
+ self.SendCommand('UNIT%d:POWer DBM' % self.inst_port)
+ self.SendCommand('FORM ASCii')
+ self._SetRange(self.range_setting)
+ self._SetAverageFilter(self.avg_count)
+ self._SetContinuousTrigger(self.continuous_trigger)
+ self._SetTriggerMode(self.trigger_source)
+
+ def Terminate(self):
+ pass
+
+ def SelfCalibrate(self, calibration_type='full'):
+ pass
+
+ def _SetPortConfig(self, port_mapping, pathloss):
+ self.current_pathloss = pathloss
+
+ def LockInstrument(self):
+ # Multi-DUT testing is not supported, ignore this method.
+ pass
+
+ def UnlockInstrument(self):
+ # Multi-DUT testing is not supported, ignore this method.
+ pass
+
+ def QueryCommand(self, command):
+ self.link.CheckCall('*CLS')
+ output = self.link.CheckOutput(command)
+ # *ESR? returns the error string. We do this to make sure that we can
+ # detect an unknown header rather than just waiting forever.
+ check_str = self.link.CheckOutput('*ESR?')
+ if ',' in check_str:
+ raise ValueError('Error issuing command %s while calling *ESR?: %s' %
+ (command, check_str))
+ # Success! Get SYST:ERR, which should be +0
+ check_str = self.link.CheckOutput('SYST:ERR?')
+ if check_str != '+0,"No error"':
+ raise ValueError('Error issuing command %s while calling SYST:ERR?: %s' %
+ (command, check_str))
+ return output
+
+ def SendCommand(self, command):
+ self.link.CheckCall('*CLS')
+ self.link.CheckCall(command)
+ # Success! Get SYST:ERR, which should be +0
+ check_str = self.link.CheckOutput('SYST:ERR?')
+ if check_str != '+0,"No error"':
+ raise ValueError('Error issuing command %s while calling SYST:ERR?: %s' %
+ (command, check_str))
+ check_str = self.link.CheckOutput('*OPC?')
+ if int(check_str) != 1:
+ raise ValueError('Expected 1 after *OPC? but got %s' % check_str)
+
+ def _SetRange(self, range_setting):
+ """Selects a sensor's range (lower or upper).
+
+ Args:
+ range_setting: None to enable auto-range feature. To speed up the
+ measurement, caller can specify the range manually based on the
+ expected power. To manually set the range, use 0 to indicate a
+ lower range and 1 for the upper range. Because range definition
+ varies from sensor to sensor, check the manual before using this
+ function.
+ """
+ assert range_setting in ['AUTO', 0, 1]
+ if range_setting == 'AUTO':
+ self.SendCommand('SENSe%d:POWer:AC:RANGe:AUTO 1' % self.inst_port)
+ else:
+ self.SendCommand('SENSe%d:POWer:AC:RANGe:AUTO 0' % self.inst_port)
+ self.SendCommand('SENSe%d:POWer:AC:RANGe %d' %
+ (self.inst_port, range_setting))
+
+ def _SetAverageFilter(self, avg_count):
+ """Sets the average filter.
+
+ There are three different average filters available, averaging disable,
+ auto averaging and average with a specific window length.
+
+ Args:
+ avg_length: Use 'AUTO' for auto averaging, 0 for averaging disable, and
+ other positive numbers for specific window length.
+ """
+ if avg_count == 'AUTO':
+ self.SendCommand('SENSe%d:AVERage:COUNt:AUTO ON' % self.inst_port)
+ elif avg_count == 0:
+ # Disable the average filter.
+ self.SendCommand('SENSe%d:AVERage:STATe 0' % self.inst_port)
+ elif isinstance(avg_count, int):
+ self.SendCommand('SENSe%d:AVERage:COUNt:AUTO OFF' % self.inst_port)
+ self.SendCommand('SENSe%d:AVERage:COUNt %d' %
+ (self.inst_port, avg_count))
+ else:
+ raise ValueError('Invalid avg_count setting [%s]' % avg_count)
+
+ def _SetContinuousTrigger(self, enable):
+ status = 'ON' if enable else 'OFF'
+ self.SendCommand('INITiate%d:CONTinuous %s' % (self.inst_port, status))
+
+ def _SetTriggerMode(self, trigger_source):
+ self.SendCommand('TRIGger%d:SOURce %s' %
+ (self.inst_port, trigger_source))
+
+ def SetFrequency(self, freq_mhz):
+ logger.info('Set frequency to %d MHz', freq_mhz)
+ self.SendCommand('SENSe%d:FREQuency %sMHZ' % (self.inst_port, freq_mhz))
+ self.SendCommand('SENSe%d:CORRection:GAIN2:STATe 0' % self.inst_port)
+
+ def FetchPower(self):
+ try:
+ self.SendCommand('INITiate%d:IMMediate' % self.inst_port)
+ result = float(self.QueryCommand('FETch%d?' % self.inst_port))
+ logger.debug('Fetch power: %s', result)
+ return result
+ except Exception:
+ logger.exception('Fetch power failed.')
+ return float('-inf')
+
+
+class RFController(InstBase.ControllerBase):
+ def __init__(self, inst=None):
+ super(RFController, self).__init__(inst)
+ self.result = None
+
+ def _Initialize(self):
+ pass
+
+ def _Terminate(self):
+ pass
+
+ def _TxConfig(self, center_freq, **kwargs):
+ self.inst.SendCommand('ABORt')
+ self.inst.SetFrequency(center_freq)
+
+ def _TxMeasure(self, center_freq, **kwargs):
+ power = self.inst.FetchPower()
+ if 'chain_mask' in kwargs: # WLAN case
+ antenna_indice = ChainMaskToList(kwargs['chain_mask'])
+ result = {}
+ for idx in antenna_indice:
+ result[idx] = power + self.inst.current_pathloss[idx][center_freq]
+ else:
+ result = power + self.inst.current_pathloss[0][center_freq]
+
+ self.result = {'avg_power': result}
+
+ def _TxGetResult(self, result_limit, **kwargs):
+ ret = self.result
+ self.result = None
+ return ret
+
+ def _RxConfig(self, **kwargs):
+ raise NotImplementedError
+
+ def _RxGenerate(self, rx_num_packets, power_level, **kwargs):
+ raise NotImplementedError
+
+ def _RxStop(self, **kwargs):
+ raise NotImplementedError
diff --git a/graphyte/inst/keysight_n1914a/keysight_n1914a_config.json b/graphyte/inst/keysight_n1914a/keysight_n1914a_config.json
new file mode 100644
index 0000000..1525a55
--- /dev/null
+++ b/graphyte/inst/keysight_n1914a/keysight_n1914a_config.json
@@ -0,0 +1,8 @@
+{
+ "link_options": {
+ "link_class": "SCPILink",
+ "host": "10.3.0.60",
+ "port": 5025
+ },
+ "inst_port": 1
+}
diff --git a/graphyte/link.py b/graphyte/link.py
index e8ed9e8..9f7b5ff 100644
--- a/graphyte/link.py
+++ b/graphyte/link.py
@@ -18,8 +18,13 @@
'GPIBLink': 'links.gpib'}
+class LinkNotReady(Exception):
+ """Exception for link is not ready."""
+ pass
+
+
class LinkOptionsError(Exception):
- """Exception for invalid DUT options."""
+ """Exception for invalid link options."""
pass
@@ -66,6 +71,10 @@
"""
raise NotImplementedError
+ def CheckReady(self):
+ if not self.IsReady():
+ raise LinkNotReady
+
def IsReady(self):
"""Checks if DUT is ready for connection.