"""An event queue for managing order ID requests."""fromthreadingimportEvent,ThreadfromtimeimportsleepfromenumimportEnumfromtypingimportList,Callable,TYPE_CHECKINGfrom.._internal.threadingimportLoggingLockfrom.._internal.traceimporttrace_all_threads_str# Type hints on IbTwsClient cause a circular dependency.# This conditional import plus a string-based annotation avoids the problem.ifTYPE_CHECKING:from.tws_clientimportIbTwsClient
[docs]classOrderIdStrategy(Enum):"""Strategy used to obtain order IDs."""def__new__(cls,retry:bool,tws_request:bool):obj=object.__new__(cls)obj.retry=retryobj.tws_request=tws_requestreturnobjINCREMENT=(False,False)"""Use the initial order ID and increment the value upon every call. This is fast, but it may fail for multiple, concurrent sessions."""BASIC=(False,True)"""Request a new order ID from TWS every time one is needed."""RETRY=(True,True)"""Request a new order ID from TWS every time one is needed. Retry if TWS does not respond quickly. TWS seems to have a bug where it does not always respond."""
classOrderIdRequest:"""An order ID request."""_event:Event_getter:Callable[[],int]_value:int_lock:LoggingLockdef__init__(self,event:Event,getter:Callable[[],int]):self._event=eventself._getter=getterself._value=Noneself._lock=LoggingLock("OrderIdRequest")defget(self)->int:"""A blocking call to get the order ID."""time_out=60.0event_happened=self._event.wait(time_out)ifnotevent_happened:trace=trace_all_threads_str()msg=f"OrderIdRequest.get() timed out after {time_out} sec. A possible deadlock or TWS bug was detected! You may be able to avoid this problem by using a different OrderIdStrategy. Please create an issue at https://github.com/deephaven-examples/deephaven-ib/issues containing this error message\n{trace}\n"raiseException(msg)withself._lock:ifself._valueisNone:self._value=self._getter()returnself._valueclassOrderIdEventQueue:"""A thread-safe queue for requesting and getting order IDs."""_events:List[Event]_values:List[int]_lock:LoggingLock_strategy:OrderIdStrategy_last_value:int_request_thread:Threaddef__init__(self,client:'IbTwsClient',strategy:OrderIdStrategy):self._events=[]self._values=[]self._lock=LoggingLock("OrderIdEventQueue")self._client=clientself._strategy=strategyself._last_value=Noneifstrategy.retry:self._request_thread=Thread(name="OrderIdEventQueueRetry",target=self._retry,daemon=True)self._request_thread.start()def_retry(self):"""Re-requests IDs if there is no response."""whileTrue:foreventinself._events:self._client.reqIds(-1)sleep(0.01)defrequest(self)->OrderIdRequest:"""Requests data from the queue."""event=Event()withself._lock:self._events.append(event)ifself._strategy.tws_request:self._client.reqIds(-1)else:self._increment_value()returnOrderIdRequest(event,self._get)defadd_value(self,value:int)->None:"""Adds a new value to the queue."""withself._lock:# Upon startup, add_value is called, to set the initial valueself._last_value=value# if is to filter out values requested by ibapi during initializationifself._events:self._values.append(value)event=self._events.pop(0)event.set()def_increment_value(self)->None:"""Increments the latest value and adds the value to the queue."""withself._lock:ifself._events:self._values.append(self._last_value)self._last_value+=1event=self._events.pop(0)event.set()def_get(self)->int:"""Gets a value from the queue."""withself._lock:returnself._values.pop(0)