Python OpenOPC的学习观感,,最近由于公司项目让我


最近由于公司项目让我研究工业标准协议OPC,开始的设计是在Linux环境用C/C++进行编写。但是经过几天的努力也没有发现可用的免费开发包,网上的收费开发包如:Softing公司的Toolkit和Matrikon公司的SDK是可以进行Linux环境下的开发。由于公司为了节省成本,又转向其他方向----需求免费开源的开发工具。网上免费开放的开发都是基于Windows下的VB、VC和C#,因为opc需要调用windows下的DCOM组件,由于这个原因成为了Linux环境下开发的主要障碍。最后我找到了OpenOPC:

OpenOPC for Python is a free, open source OPC (OLE for Process Control) toolkit designed for use with the popular Python programming language. The unique features that set it apart from the many commercially available OPC toolkits include...

This project utilizes the de facto OPC-DA (Win32 COM-based) industrial automation standard. If you are looking for an OPC XML-DA library for Python, then please visit thePyOPCproject.

下面我就介绍一下OpenOPC开发工具的使用,因为我发现网上这方面的资料太少了(当然你也可以看OpenOPC的官网说明http://openopc.sourceforge.net/about.html,不过内容很少)。

首先,我们来谈谈OpenOPC-1.3.1.win32-py2.7.exe:

当你安装完毕OpenOPC-1.3.1.win32-py2.7.exe时,在你的安装路径可以看到如下目录:

技术分享

bin目录下有两个文件:opc.exe和OpenOPCService.exe,这两个文件是src目录下的opc.py和OpenOPCService.py生成的,他们是可以在命令提示符下直接运行的,具体操作可以查看README.txt文件;

doc目录下是OpenOPC的说明文件,通过它你可以了解到OpenOPC的具体使用方法,主要是测试工具opc.py或opc.exe文件的使用;

lib目录下主要是OpenOPC所要调用的库文件gbda_aut.dll;

src目录下包括opc.py、OpenOPCService.py、OpenOPC.py、SystemHealth.py;

其次,我们来谈谈测试OPC server-----MatrikonOPCSimulation

点击MatrikonOPCSimulation.exe文件对其进行安装,安装完毕后需要将其服务器的名字设置到环境变量中:OPC_SERVER=matrikon.OPC.simulation

启动MatrikonOPC server of simulation软件设置其属性就OK了,具体操作请查看软件文档;

我们再来谈谈OpenOPC的原理:

从测试demo---opc.py中我们可以看到它的启动方式有两种:docm和open。dcom方式是直接调用本机上的DCOM组件,而open方式是通过zzzopenopcservice(OpenOPC Gateway Service on Windows)来调用windows上的DCOM组件,因此你在调用时要认真区分;

zzzopenopcservice的启动,详情请查看OpenOPC/doc/OpenOPC.pdf文档;

你还可以通过opc.py上的不同命令测试拉取MatrikonOPC server of simulation上的数据信息;

最后,我们谈谈MACSV系统OPC Server通信软件的使用和测试代码的应用实例:

安装MACSV系统OPC Server通信软件,并将其服务器的名字设置到环境变量中:OPC_SERVER=Hollysys.MACSV5OPCServer;

启动MACSV5OPCServer,设置其属性;

测试代码opctest1.py如下:

from sys import *from getopt import *from os import *import signalimport sysimport osimport typesimport datetimeimport re, time, csvimport OpenOPCimport Pyroopc_class='Matrikon.OPC.Automation;Graybox.OPC.DAWrapper;HSCOPC.Automation;RSI.OPCAutomation;OPC.Automation'client_name='OpenOPC'opc_server='Hollysys.MACSV5OPCServer'opc_host='127.0.0.1'taglist=['Server1.Group0.opc_01','Server1.Group0.opc_02','Server1.Group1.rule1','Server1.Group1.rule2','Server2.Group0.sg1','Server2.Group0.sg2','Server2.Group1.sg2','Server2.Group1.sg1']class SigHandler:    def __init__(self):        self.signaled = 0        self.sn = None    def __call__(self, sn, sf):        self.sn = sn        self.signaled += 1# Establish signal handler for keyboard interruptsdef signalhandle():    sh = SigHandler()    signal.signal(signal.SIGINT,sh)    if os.name == 'nt':        signal.signal(signal.SIGBREAK,sh)    signal.signal(signal.SIGTERM,sh)    return shdef test01():    opc = OpenOPC.client(opc_class, client_name)    print "###  create opc!"    opc.connect(opc_server, opc_host)    print "###  connect opc server:", opc_server    test02(opc)    taglist_opc = opc.read(taglist)    info_opc = opc.info()    servers_opc = opc.servers(opc_host)    tags = []    data = opc.list(tags, flat=True)    list_opc = opc.list    list_data = list_opc(tags)    opc.close()    print "### close opc!"    for i in range(len(taglist_opc)):        (name, val, qual, time) = taglist_opc[i]        print 'name:', name        print 'val:', val        print 'qual:', qual        print 'time:', time        print    print "value:", str(list(taglist_opc))    print "info:", str(list(info_opc))    print "servers:", str(list(servers_opc))    print "data:", str(list(data))    print "list:", str(list(list_data))def test02(opc):    count0 = 0    sh = signalhandle()    while not sh.signaled:        print "test02...    ", count0        data_opc = opc.read(tags=['Server1.Group0.opc_01'],                            #group='test',                            #size=group_size,                            #pause=tx_pause,                            #source=data_source,                            update=1000,                            #timeout=timeout,                            #sync=sync,                            #include_error=include_err_msg)                            )        print "[test02] data:", str(list(data_opc))        try:            time.sleep(1)        except IOError:            break        count0 += 1def main():    test01()if __name__ == '__main__':    main()

具体开发包和测试代码可以到http://download.csdn.net/detail/cl185303590/8821585进行下载。





Python OpenOPC的学习观感

评论关闭