Running into some problems tryign to get the TDA streaming API up and running

Trying to get the streaming API up and running to stream some futures quotes, heres my script:

import configparser, requests, json, urllib, dateutil.parser, websockets, asyncio, pyodbc, nest_asyncio from datetime import datetime def credentials(): global client_ID global acc_number global password global access_token config = configparser.ConfigParser() config.read('./lib/config.txt') client_ID = config.get('configuration','key') acc_number = config.get('configuration','td_acc_number') password = config.get('configuration','td_pass') with open('./lib/oauth_access.json') as file: token = json.load(file) access_token = (token['access_token']) def unix_time_millis(dt): epoch = datetime.utcfromtimestamp(0) return (dt - epoch).total_seconds() * 1000.0 def user_principals(): global userPrincipalsResponse global credentials endpoint = 'https://api.tdameritrade.com/v1/userprincipals' headers = {'Authorization' : 'Bearer {}'.format(access_token)} params = {'fields':'streamerSubscriptionKeys,streamerConnectionInfo'} content = requests.get(url = endpoint, params = params, headers = headers) userPrincipalsResponse = content.json() # we need to get the timestamp in order to make our next request, but it needs to be parsed tokenTimeStamp = userPrincipalsResponse['streamerInfo']['tokenTimestamp'] date = dateutil.parser.parse(tokenTimeStamp, ignoretz = True) tokenTimeStampAsMs = unix_time_millis(date) # we need to define our credentials that we will need to make our stream credentials = {"userid": userPrincipalsResponse['accounts'][0]['accountId'], "token": userPrincipalsResponse['streamerInfo']['token'], "company": userPrincipalsResponse['accounts'][0]['company'], "segment": userPrincipalsResponse['accounts'][0]['segment'], "cddomain": userPrincipalsResponse['accounts'][0]['accountCdDomainId'], "usergroup": userPrincipalsResponse['streamerInfo']['userGroup'], "accesslevel":userPrincipalsResponse['streamerInfo']['accessLevel'], "authorized": "Y", "timestamp": int(tokenTimeStampAsMs), "appid": userPrincipalsResponse['streamerInfo']['appId'], "acl": userPrincipalsResponse['streamerInfo']['acl'] } def build_request(): # define a request login_request = {"requests": [{"service": "ADMIN", "requestid": "0", "command": "LOGIN", "account": userPrincipalsResponse['accounts'][0]['accountId'], "source": userPrincipalsResponse['streamerInfo']['appId'], "parameters": {"credential": urllib.parse.urlencode(credentials), "token": userPrincipalsResponse['streamerInfo']['token'], "version": "1.0"}}]} # define a request for different data sources data_request= {"requests": [{"service": "LEVELONE_FUTURES", "requestid": "2", "command": "SUBS", "account": userPrincipalsResponse['accounts'][0]['accountId'], "source": userPrincipalsResponse['streamerInfo']['appId'], "parameters": {"keys": "/ES", "fields": "0,1,2,3,4,5,8,9,10,11,14,18,19,20,22,23,24,25,26,34,35"}}]} # create it into a JSON string, as the API expects a JSON string. global login_encoded global data_encoded login_encoded = json.dumps(login_request) data_encoded = json.dumps(data_request) class WebSocketClient(): def __init__(self): self.data_holder = [] self.file = open('td_ameritrade_data.txt', 'a') self.cnxn = None self.crsr = None def database_connect(self): # define the server and the database server = 'DESKTOP-26UTDS9\SQLEXPRESS' database = 'futures_db' sql_driver = '{ODBC Driver 17 for SQL Server}' # define our connection, autocommit MUST BE SET TO TRUE, also we can edit data. self.cnxn = pyodbc.connect(driver = sql_driver, server = server, database = database, trusted_connection ='yes') self.crsr = self.cnxn.cursor() def database_insert(self, query, data_tuple): # execute the query, commit the changes, and close the connection self.crsr.execute(query, data_tuple) self.cnxn.commit() self.cnxn.close() print('Data has been successfully inserted into the database.') async def connect(self): ''' Connecting to webSocket server websockets.client.connect returns a WebSocketClientProtocol, which is used to send and receive messages ''' # define the URI of the data stream, and connect to it. uri = "wss://" + userPrincipalsResponse['streamerInfo']['streamerSocketUrl'] + "/ws" self.connection = await websockets.client.connect(uri) # if all goes well, let the user know. if self.connection.open: print('Connection established. Client correctly connected') return self.connection async def sendMessage(self, message): ''' Sending message to webSocket server ''' await self.connection.send(message) async def receiveMessage(self, connection): ''' Receiving all server messages and handle them ''' while True: try: # grab and decode the message message = await connection.recv() message_decoded = json.loads(message) # prepare data for insertion, connect to database query = """INSERT INTO es_contracts_data (service, timestamp, command, symbol, bid, ask, last, bid_size, ask_size, total_vol, last_size, quote_time, trade_time, prior_day_close, day_net_change, percent_change, status, OI, mark_price, tick, tick_amt, active_contract, exp_date) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);""" self.database_connect() # check if the response contains a key called data if so then it contains the info we want to insert. if 'data' in message_decoded.keys(): # grab the data data = message_decoded['data'][0] data_tuple = (data['service'], str(data['timestamp']), data['command'], data['content'][0]['key'], data['content'][0]['1'], data['content'][0]['2'], data['content'][0]['3'], data['content'][0]['4'], data['content'][0]['5'], data['content'][0]['8'], data['content'][0]['9'], data['content'][0]['10'], data['content'][0]['11'], data['content'][0]['14'], data['content'][0]['19'], data['content'][0]['20'], data['content'][0]['22'], data['content'][0]['23'], data['content'][0]['24'], data['content'][0]['25'], data['content'][0]['26'], data['content'][0]['34'], data['content'][0]['35']) # insert the data self.database_insert(query, data_tuple) print('-'*20) print('Received message from server: ' + str(message)) except websockets.exceptions.ConnectionClosed: print('Connection with server closed') break async def heartbeat(self, connection): ''' Sending heartbeat to server every 5 seconds Ping - pong messages to verify connection is alive ''' while True: try: await connection.send('ping') await asyncio.sleep(5) except websockets.exceptions.ConnectionClosed: print('Connection with server closed') break credentials() user_principals() build_request() nest_asyncio.apply() if __name__ == '__main__': # Creating client object client = WebSocketClient() loop = asyncio.get_event_loop() # Start connection and get client connection protocol connection = loop.run_until_complete(client.connect()) # Start listener and heartbeat tasks = [asyncio.ensure_future(client.receiveMessage(connection)), asyncio.ensure_future(client.sendMessage(login_encoded)), asyncio.ensure_future(client.receiveMessage(connection)), asyncio.ensure_future(client.sendMessage(data_encoded)), asyncio.ensure_future(client.receiveMessage(connection))] loop.run_until_complete(asyncio.wait(tasks)) 

It's basically the script from here: https://github.com/areed1192/sigma_coding_youtube/blob/master/python/python-finance/td-ameritrade/TD%20Streaming%20API.ipynb

Anyways, it was working pretty well and then I left to go to hockey. I come back and it all seems to be going well for the first 5 seconds. I get the correct output to the console:

Connection established. Client correctly connected -------------------- Received message from server: {"response":[{"service":"ADMIN","requestid":"0","command":"LOGIN","timestamp":1598487348238,"content":{"code":0,"msg":"04-4"}}]} -------------------- Received message from server: {"notify":[{"heartbeat":"1598487348301"}]} -------------------- Received message from server: {"response":[{"service":"LEVELONE_FUTURES","requestid":"2","command":"SUBS","timestamp":1598487348301,"content":{"code":0,"msg":"SUBS command succeeded"}}]} Data has been successfully inserted into the database. -------------------- Received message from server: {"data":[{"service":"LEVELONE_FUTURES", "timestamp":1598487348361,"command":"SUBS","content":[{"key":"/ES","delayed":false,"assetMainType":"FUTURE","1":3475,"2":3475.5,"3":3475.25,"4":37,"5":27,"8":15827,"9":1,"10":1598487347421,"11":1598487347303,"14":3480.25,"18":3479.5,"19":-5,"20":-0.0014,"22":"Unknown","23":2663491,"24":3475.25,"25":0.25,"26":12.5,"34":"/ESU20","35":1600401600000}]}]} 

But then I get smacked with this error:

Task exception was never retrieved future: <Task finished coro=<WebSocketClient.receiveMessage() done, defined at C:\Users\logan\OneDrive\Desktop\Financial\Project X-Ray\websocket_streaming.py:147> exception=RuntimeError('cannot call recv while another coroutine is already waiting for the next message')> Traceback (most recent call last): File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python37_64\lib\asyncio\tasks.py", line 249, in __step result = coro.send(None) File "C:\Users\logan\OneDrive\Desktop\Financial\Project X-Ray\websocket_streaming.py", line 155, in receiveMessage message = await connection.recv() File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python37_64\lib\site-packages\websockets\protocol.py", line 474, in recv "cannot call recv while another coroutine " RuntimeError: cannot call recv while another coroutine is already waiting for the next message Task exception was never retrieved future: <Task finished coro=<WebSocketClient.receiveMessage() done, defined at C:\Users\logan\OneDrive\Desktop\Financial\Project X-Ray\websocket_streaming.py:147> exception=RuntimeError('cannot call recv while another coroutine is already waiting for the next message')> Traceback (most recent call last): File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python37_64\lib\asyncio\tasks.py", line 249, in __step result = coro.send(None) File "C:\Users\logan\OneDrive\Desktop\Financial\Project X-Ray\websocket_streaming.py", line 155, in receiveMessage message = await connection.recv() File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python37_64\lib\site-packages\websockets\protocol.py", line 474, in recv "cannot call recv while another coroutine " RuntimeError: cannot call recv while another coroutine is already waiting for the next message Task exception was never retrieved future: <Task finished coro=<WebSocketClient.receiveMessage() done, defined at C:\Users\logan\OneDrive\Desktop\Financial\Project X-Ray\websocket_streaming.py:147> exception=KeyError('2')> Traceback (most recent call last): File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python37_64\lib\asyncio\tasks.py", line 249, in __step result = coro.send(None) File "C:\Users\logan\OneDrive\Desktop\Financial\Project X-Ray\websocket_streaming.py", line 175, in receiveMessage data_tuple = (data['service'], str(data['timestamp']), data['command'], data['content'][0]['key'], data['content'][0]['1'], data['content'][0]['2'], data['content'][0]['3'], KeyError: '2' 

And I have no idea what's wrong, especially since it seemed to work before I left. Anyone who can help would be greatly appreciated!

Submitted August 26, 2020 at 06:23PM by LMAY75
via https://ift.tt/31uuWzm

One thought on “Running into some problems tryign to get the TDA streaming API up and running”

  1. did you get this figured out?

    websockets.client.connect(uri, ping_interval=None)

    the ping_interval=None i believe is what resolved this issue for me…

    Came across this blog post while trying to figure out how to catch whatever the api is throwing at 4pm when the market closes.

    Cheers

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s