global g_TradeZMQ
global g_QuoteZMQ
g_QuoteZMQ = tcore_zmq("SystemName","ServiceKey")
g_TradeZMQ = tcore_zmq("SystemName","ServiceKey")
q_data = g_QuoteZMQ.quote_connect("port")
t_data = g_TradeZMQ.trade_connect("port")
global g_QuoteSession
global g_TradeSession
g_QuoteSession = q_data["SessionKey"]
g_TradeSession = t_data["SessionKey"]
#建立一个行情线程
t1=threading.Thread(target = quote_sub_th,args=(g_QuoteZMQ,q_data,))
t1.start()
#创建一个交易线程
t2 = threading.Thread(target = trade_sub_th,args=(g_TradeZMQ,t_data["SubPort"],))
t2.start()
#行情callback线程处理函数
def quote_sub_th(obj,q_data,filter = ""):
socket_sub = obj.context.socket(zmq.SUB)
#socket_sub.RCVTIMEO=7000
#print(sub_port)
socket_sub.connect("tcp://127.0.0.1:%s" % q_data["SubPort"])
socket_sub.setsockopt_string(zmq.SUBSCRIBE,filter)
while(True):
message = (socket_sub.recv()[:-1]).decode("utf-8")
index = re.search(":",message).span()[1] # filter
symbol = message[:index-1]
message = message[index:]
message = json.loads(message)
#for message in messages:
if(message["DataType"] == "PING"):
g_QuoteZMQ.QuotePong(g_QuoteSession)
elif(message["DataType"]=="REALTIME"):
OnRealTimeQuote(message["Quote"])
elif(message["DataType"]=="GREEKS"):
OnGreeks(message["Quote"])
elif(message["DataType"]=="1K"):
print("@@@@@@@@@@@@@@@@@@@@@@@",message)
strQryIndex = ""
while(True):
History_obj = {
"Symbol": symbol,
"SubDataType":"1K",
"StartTime" : message["StartTime"],
"EndTime" : message["EndTime"],
"QryIndex" : strQryIndex
}
s_history = obj.get_history(q_data["SessionKey"],History_obj)
historyData = s_history["HisData"]
if len(historyData) == 0:
break
last = ""
for data in historyData:
last = data
print("历史行情:Time:%s, Volume:%s, QryIndex:%s" % (data["Time"], data["Volume"], data["QryIndex"]))
strQryIndex = last["QryIndex"]
return
#交易callback线程处理函数
def trade_sub_th(obj,sub_port,filter = ""):
socket_sub = obj.context.socket(zmq.SUB)
#socket_sub.RCVTIMEO=5000
socket_sub.connect("tcp://127.0.0.1:%s" % sub_port)
socket_sub.setsockopt_string(zmq.SUBSCRIBE,filter)
while True:
message = socket_sub.recv()
if message:
message = json.loads(message[:-1])
#print("in trade message",message)
if(message["DataType"] == "PING"):
g_TradeZMQ.TradePong(g_TradeSession)
elif(message["DataType"] == "ACCOUNTS"):
for i in message["Accounts"]:
OnGetAccount(i)
elif(message["DataType"] == "EXECUTIONREPORT"):
OnexeReport(message["Report"])
elif(message["DataType"] == "FILLEDREPORT"):
RtnFillReport(message["Report"])