#行情callback线程处理函数
def quote_sub_th(obj,q_data,filter = ""):
socket_sub = obj.context.socket(zmq.SUB)
#socket_sub.RCVTIMEO=7000
#print(sub_port)
socket_sub.connect("tcp://127.0.0.1:%s" % q_data["SubPort"])
socket_sub.setsockopt_string(zmq.SUBSCRIBE,filter)
while(True):
message = (socket_sub.recv()[:-1]).decode("utf-8")
index = re.search(":",message).span()[1] # filter
symbol = message[:index-1]
message = message[index:]
message = json.loads(message)
#for message in messages:
if(message["DataType"] == "PING"):
g_QuoteZMQ.QuotePong(g_QuoteSession)
elif(message["DataType"]=="REALTIME"):
OnRealTimeQuote(message["Quote"])
elif(message["DataType"]=="GREEKS"):
OnGreeks(message["Quote"])
elif(message["DataType"]=="1K"):
print("@@@@@@@@@@@@@@@@@@@@@@@",message)
strQryIndex = ""
while(True):
History_obj = {
"Symbol": symbol,
"SubDataType":"1K",
"StartTime" : message["StartTime"],
"EndTime" : message["EndTime"],
"QryIndex" : strQryIndex
}
s_history = obj.get_history(q_data["SessionKey"],History_obj)
historyData = s_history["HisData"]
if len(historyData) == 0:
break
last = ""
for data in historyData:
last = data
print("历史行情:Time:%s, Volume:%s, QryIndex:%s" % (data["Time"], data["Volume"], data["QryIndex"]))
strQryIndex = last["QryIndex"]
return