Execution Module¶
Haymaker trading algorithms consist of a series of Atom components, each implementing a step in a trading algorithm. These components are piped together in an event-driven fashion.
Common trading steps include:
Receiving price data.
Processing, aggregating, or filtering data.
Generating trading signals.
Managing the portfolio.
Controlling risk.
Managing execution.
Each processing component (called an “Atom”) inherits from haymaker.base.Atom.
Atom Object¶
- class haymaker.base.Atom[source]¶
Abstract base object from which all other objects inherit. It’s a basic building block for creating strategies in Haymaker. Every
Atomrepresents a processing step typically for one traded instrument, thus allowing for separation of concerns. ConnectingAtomscreates a processing pipeline.Atomsin a pipeline communicate with each other in an event-driven manner through three methods:onStart(),onData(),onFeedback(). Those methods are called when appropriateeventkit.event.Eventobjects are emitted (respectivelystartEvent,dataEvent,feedbackEvent), and emit their own events when they are done processing thus sending a signal to the nextAtomin the pipeline that it can start processing.Users are free to put in those methods any processing logic they want, using any libraries and tools required.
Atomscan be connected in any order; unions can be created by connecting more than oneAtom; pipelines can be created using auxiliary classPipeline.- contract¶
The contract object associated with this Atom. This can be any
ib_insync.contract.Contract. On startup this contract will be qualified and availableib_insync.contract.ContractDetailswill be downloaded from broker and made available throughdetailsattribute. If contract is aib_insync.contract.ContFutureorib_insync.contract.Future, it will be replaced with on-the-runib_insync.contract.Future. ContFuture will pick contract that IB considers to be be current, Future allows for customization by tweakingFutureSelector. Whichever method is chose, when contract to be rolled,onContractChanged()method will be called.This attribute doesn’t need to be set. If this Atom object is not related to any one particular contract, just don’t assign any value to this attribute.
- which_contract¶
default: ACTIVE; if NEXT chosen
contractwill return next contract in chain (relevant only for expiring contracts like futures or options) allowing for early usage of upcoming contracts for new positions a short period before they become active (number of days prior to expiry during which NEXT will be used can be configured in config.)- Type:
ActiveNext
- ib¶
The instance of the
ib_insync.ib.IBclient used for interacting with the broker. It can be used to communicate with the broker if neccessary.- Type:
ClassVar[ibi.IB]
- sm¶
Access to
StateMachinewhich is Haymaker’s central collection of information about current positions, orders and state of strategies.- Type:
ClassVar[StateMachine]
- events¶
Collection of
eventkit.Eventobjects used byHaymaker, i.e.startEvent,dataEvent,feedbackEvent, appropriate methods should use these events to communicate with other objects in the chain, e.g.onStart()after processing incoming data should pass the result to the next object by calling self.dataEvent.emit(data).- Type:
ClassVar[Sequence[str]]
- connect(*targets)[source]¶
Connect appropriate events and methods to subsequent
Atomobject(s) in the chain. Shorthand for this method is +=
- property details: Details | list[Details] | DetailsContainer¶
Contract details received from the broker.
If
contractis set,detailswill be returned only for this contract, otherwisedetailswill return a dictionary of all available details, ie.dict`[:class:`ibi.Contract,Details]Detailsis a wrapper aroundib_insync.contract.ContractDetailswith some additional methods and attributes.
- disconnect(*targets)[source]¶
Disconnect passed
Atomobjects, which are directly connected to this atom. Shorthand for this method is -=
- onContractChanged(old_contract, new_contract)[source]¶
Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.
- onData(data, *args)[source]¶
Connected to
dataEventof the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain usingdataEvent(by calling self.dataEvent.emit(data)).It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.
Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.
This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- onFeedback(data, *args)[source]¶
Connected to
feedbackEventof the subsequent object in the chain. Allows for passing of information about trading results. It’s optional to use it, if used, overriden method must emit feedbackEvent with appropriate data. If not overriden, it will just pass received data to the previous object in the chain.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
Auxiliary Objects¶
- class haymaker.base.Pipe(*targets)[source]¶
Auxiliary object for conneting several
Atomobjects. Atoms to be connected need to passed in the right order at initialization.Pipeitself is a subclass ofAtom, so it can be connected to otherAtom(orPipe) and allAtomattributes and methods are available.- connect(*targets)[source]¶
Connect appropriate events and methods to subsequent
Atomobject(s) in the chain. Shorthand for this method is +=
- disconnect(*targets)[source]¶
Disconnect passed
Atomobjects, which are directly connected to this atom. Shorthand for this method is -=
- onData(data, *args)[source]¶
Connected to
dataEventof the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain usingdataEvent(by calling self.dataEvent.emit(data)).It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.
Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.
This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
- onFeedback(data, *args)[source]¶
Connected to
feedbackEventof the subsequent object in the chain. Allows for passing of information about trading results. It’s optional to use it, if used, overriden method must emit feedbackEvent with appropriate data. If not overriden, it will just pass received data to the previous object in the chain.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
- class haymaker.base.Details(details)[source]¶
Wrapper object for
ib_insync.contract.ContractDetailsextracting and processing information that’s most relevant for Haymaker.- details¶
The original contract details object.
- Type:
ibi.ContractDetails
- trading_hours¶
List of tuples with start and end of trading hours for this contract.
- liquid_hours¶
List of tuples with start and end of liquid hours for this contract.
- is_liquid(_now=None)[source]¶
Given current time check if the market is during liquid hours for underlying contract.
- Parameters:
_now (Optional[datetime], optional) – . Defaults to None.
when (If not provided current time will be used. Only situation)
testing. (it's useful to provide _now is in)
- Returns:
True if market is liquid, False otherwise.
- Return type:
- is_open(_now=None)[source]¶
Given current time check if the market is open for underlying contract.
- Parameters:
_now (Optional[datetime], optional) – Defaults to None.
when (If not provided current time will be used. Only situation)
testing. (it's useful to provide _now is in)
- Returns:
True if market is open, False otherwise.
- Return type:
Strategy Building Components¶
Building on haymaker.base.Atom, Haymaker offers skeletons of several components addressing typical requirements in building trading strategies:
Streamer: Connects to the broker and pipes market data.
Aggregator: Custom aggregation or processing of market data before signal generation.
Brick: Generates trading signals; this is the core building block of strategies (like bricks in a house).
Note
If you have an idea for a better name, email me! :)
Signal Processor: Filters or processes signals based on strategy state or auxiliary data, determining whether signals should trigger orders.
Portfolio: A global object that receives processed signals and translates them into allocations (e.g., amounts of instruments to trade). It uses data like account value, holdings, volatility, risk targets, and concentration limits.
Execution Model: Issues actual broker orders based on target instrument amounts.
Below is a review of how to use pre-built component modules:
Streamer¶
Haymaker provides streamers corresponding to all ib_insync market data subscriptions:
ib_insync Method
Streamer
reqHistoricalDataAsync
HistoricalDataStreamer
reqMktData
MktDataStreamer
reqRealTimeBars
RealTimeBarsStreamer
reqTickByTickData
TickByTickStreamer
All streamers extend haymaker.streamers.Streamer.
- class haymaker.streamers.Streamer[source]¶
- classmethod awaitables()[source]¶
Coroutines from all instantiated streamers. Can be passed to
asyncio.gather()
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
Implementations¶
Every implementation accepts the same arguments as the respective ib_insync method it wraps, plus standard haymaker.streamers.Streamer parameters.
- class haymaker.streamers.HistoricalDataStreamer(contract, durationStr, barSizeSetting, whatToShow, useRTH=False, formatDate=2, incremental_only=True, startup_seconds=5, _last_bar_date=None, _future_adjust_flag=False, _adjusted=<factory>)[source]¶
- date_to_delta(date)[source]¶
Return number of bars (as per barSizeSetting) since date. Used to determine number of bars required to backfill since last reset.
- Return type:
- onContractChanged(old_contract, new_contract)[source]¶
Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
- class haymaker.streamers.RealTimeBarsStreamer(contract, whatToShow, useRTH, incremental_only=True)[source]¶
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
Aggregator¶
- class haymaker.aggregators.BarAggregator(filter, incremental_only=False, future_adjust_type='add', debug=False)[source]¶
Aggregate recieved data bars into new bars based on the criteria specified in :attr:`filter’. Store processed data.
- adjust_future(new_bar)[source]¶
Create continuous future price series on future contract change. IB mechanism cannot be trusted. This feature can be turned off by passing future_adjust_type=None while initiating the class.
- onDataBar(bars, *args)[source]¶
This is connected to self._filter, will emit whatever comes from filter.
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
Available Filters¶
Brick¶
- class haymaker.brick.AbstractBaseBrick(strategy, contract)[source]¶
- onData(data, *args)[source]¶
Connected to
dataEventof the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain usingdataEvent(by calling self.dataEvent.emit(data)).It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.
Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.
This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
Implementations¶
Override haymaker.brick.AbstractDfBrick.df() when creating a concrete AbstractDfBrick.
Signal Processor¶
To better separate concerns, filter signals received from haymaker.brick.AbstractBaseBrick based on strategy state (e.g., avoid repeated signals or re-entering after a stop-out). While this could be done in haymaker.brick.AbstractBaseBrick, using a haymaker.signals.BinarySignalProcessor is more modular.
Available processors are designed for binary signals (on/off switches). For discrete signals (e.g., 0–10 strength), these implementations aren’t suitable, but you can develop custom ones.
It’s easiest to create a binary signal processor by implementing haymaker.signals.AbstractBaseBinarySignalProcessor.
- class haymaker.signals.AbstractBaseBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Process binary signals, i.e. long/short/off, as opposed to descrete signals, where signal strength is meaningful (e.g. signal assuming values -10…10 based on strength of conviction).
Actual position size or even whether the position should be taken at all is not determined here, it’s the job of Portfolio.
Actual meaning of signals is defined in sub-classes, by overriding methods:
process_position()andprocess_no_position().Whatever the meaning of the signal coming in, signal coming out means strategy wants to take action in the direction of signal, as indicated by keys action and signal in the emitted dict. Incoming signals that don’t require any action will be stopped here and not propagated down the chain.
In sub-class names blip means zero signal should be ignored, othewise absence of signal means there should be no position.
Args:¶
signal_fields - if str, single field of this name is used as signal in (open position) and signal out (close position), if tuple then first element is signal in and second is signal out
state_machine - this is for testing only and should not be passed in non-testing environment
- onData(data, *args)[source]¶
Connected to
dataEventof the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain usingdataEvent(by calling self.dataEvent.emit(data)).It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.
Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.
This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
Implementations¶
- class haymaker.signals.BinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Zero signal means close position if position exists
Non-zero signal means:
** open new position if there is no position for the strategy
** ignore signal if it’s in the same direction as existing position
** close position if the signal is in the direction opposite to existing position
- class haymaker.signals.BlipBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Zero signal means do nothing
Non-zero signal means:
** open new position if there is no position for the strategy
** ignore signal if it’s in the same direction as existing position
** close position if the signal is in the direction opposite to existing position
- class haymaker.signals.LockableBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Signals in the direction of last position are ignored (one side
of the market is ‘locked’). It’s up to
StateMachineto determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).Zero signal means close position if position exists
Non-zero signal means:
** open new position if there is no position for the strategy
** ignore signal if it’s in the same direction as existing position
** close position if the signal is in the direction opposite to existing position
- onData(data, *args)[source]¶
Connected to
dataEventof the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain usingdataEvent(by calling self.dataEvent.emit(data)).It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.
Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.
This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
- class haymaker.signals.LockableBlipBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Signals in the direction of last position are ignored (one side
of the market is ‘locked’). It’s up to
StateMachineto determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).Zero signal means do nothing
Non-zero signal means:
** open new position if there is no position for the strategy
** ignore signal if it’s in the same direction as existing position
** close position if the signal is in the direction opposite to existing position
- class haymaker.signals.AlwaysOnLockableBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Signals in the direction of last position are ignored (one side
of the market is ‘locked’). It’s up to
StateMachineto determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).Zero signal means close position if position exists
Non-zero signal means:
** open new position if there is no position for the strategy
** ignore signal if it’s in the same direction as existing position
** reverse position if the signal is in the direction opposite to existing position
- class haymaker.signals.AlwaysOnBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]¶
Zero signal means close position if position exists
Non-zero signal means:
** open new position if there is no position for the strategy
** ignore signal if it’s in the same direction as existing position
** close position if the signal is in the direction opposite to existing position
Factory Function¶
- haymaker.signals.binary_signal_processor_factory(lockable=False, always_on=False)[source]¶
Helper function to return appropriate class based on parameters.
- Parameters:
lockable – True - no signals in the direction of last position if the last position was stopped out (allowed, if position was closed through means other than stop-loss)
always_on – True - ‘CLOSE’ signal also opens position in reverese direction
- Return type:
Portfolio¶
- class haymaker.portfolio.AbstractBasePortfolio(*args, **kwargs)[source]¶
Decides what, if and how much to trade based on received signals and queries to [SM?].
Each strategy should have its own instance of portfolio to ensure that signals form various strategies should not be mixed-up. Actual singleton porfolio object should be passed to those instances, which should delegate allocation to this object.
Implementations¶
Wrapper¶
- class haymaker.portfolio.PortfolioWrapper[source]¶
- onData(data, *args)[source]¶
Connected to
dataEventof the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain usingdataEvent(by calling self.dataEvent.emit(data)).It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.
Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.
This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
An instance of haymaker.portfolio.AbstractBasePortfolio should never be directly included in a processing pipeline, as there should be only one portfolio for all strategies. Instead, include an instance of haymaker.portfolio.PortfolioWrapper in a pipeline. As long as an instance of haymaker.portfolio.AbstractBasePortfolio exists in your package, haymaker.portfolio.PortfolioWrapper ensures it’s connected.
Execution Model¶
It’s easiest to create an execution model by extending haymaker.execution_models.AbstractExecModel.
- class haymaker.execution_models.AbstractExecModel(*, open_order={}, close_order={}, controller=None)[source]¶
Intermediary between Portfolio and Trader. It translates strategy signals into orders acceptable by Interactive Brokers. May also implement market execution strategy, monitor post-order events or manage any other function related to order execution.
- abstractmethod onData(data, *args)[source]¶
Must use
self.trade`(:class:`ibi.Contract(),ibi.Order, strategy_key, reason) to send orders for execution, and subsequently link anyibi.Tradeevents returned byself.trade()to required callbacks.While openning position must set
self.contracttoibi.Contractthat has been used.Must keep track of current position in the market by updating
self.data.position.Args:¶
data (dict): is a dict created by
Brick, updated byPortfolio, which must contain all parameters required to execute transactions in line with this execution model.Returns:¶
:
(trade, note), where:
trade:
ibi.Tradeobject for the issued order *
note: info string for loggers and blotters about the character of the transaction (open, close, stop, etc.)
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
- Return type:
Implementations¶
- class haymaker.execution_models.BaseExecModel(*, open_order={}, close_order={}, controller=None)[source]¶
Bases:
AbstractExecModelOrders generated based on data sent to
onData(), of which following keys are required:action: must be one of:OPEN,CLOSE,REVERSEsignaldetermines transaction direction, must be one of {-1, 1} for sell/buy respectively; irrelevant forCLOSE, where direction is determined by current positioncontract- thisibi.Contractinstance will be tradedamount- quantity of ``contract``s that will be tradedtarget_position- one of {-1, 0, 1} determining direction AFTER transaction is executed; will be used byControllerto verify if the transaction’s effect was as desired
Enters and closes positions based on params sent to
onData(). Orders composed byopen()andclose(), which can be overridden or extended in subclasses to get more complex behaviour.- onData(data, *args)[source]¶
Must use
self.trade`(:class:`ibi.Contract(),ibi.Order, strategy_key, reason) to send orders for execution, and subsequently link anyibi.Tradeevents returned byself.trade()to required callbacks.While openning position must set
self.contracttoibi.Contractthat has been used.Must keep track of current position in the market by updating
self.data.position.Args:¶
data (dict): is a dict created by
Brick, updated byPortfolio, which must contain all parameters required to execute transactions in line with this execution model.Returns:¶
:
(trade, note), where:
trade:
ibi.Tradeobject for the issued order *
note: info string for loggers and blotters about the character of the transaction (open, close, stop, etc.)
- class haymaker.execution_models.EventDrivenExecModel(*, open_order={}, close_order={}, stop_order={}, tp_order={}, stop=None, take_profit=None, oca_type=2, controller=None)[source]¶
Bases:
BaseExecModelUse events to attach stop-loss and optional take-profit orders after execution of open order. After close transaction remove existing bracketing orders.
Parameters (keyword only):¶
- open_order, close_order, stop_order, tp_order: dict, optional
maybe used to define order type used for respective transactions; passed dicts will be used to extend and/or update any defaults from config
- stop:
AbstractBracketLeginstance, must be provided manages creation of stop-loss order
- take_profit:
AbstractBracketLeginstance, optional manages creation of take-profit order
- oca_type: int, default 1
OCA group type as per Interactive Brokers definition
- controller:
Controllerinstance, optional passing
Controlleris meant for testing; otherwise the system should be allowed to use its own mechanisms to create it
- close(data, dynamic_order_kwargs=None)[source]¶
On top of actions perfomed by base class, this method will: attach oca that will cancel any brackets after order execution.
- onStart(data, *args)[source]¶
Perform any initilization required on system (re)start. It will be run automatically and it will be linked to
startEventof the preceding object in the chain.First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.
data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.
If overriding the class, call superclass; call to
super().onStart(data)()should be the last line in overriden method; don’t manually emitstartEventin subclass.This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.
This model automatically places stop-loss orders (and potentially take-profit orders) when the original order is filled.
Example Usage¶
from dataclasses import dataclass
import ib_insync as ibi
import numpy as np
import pandas as pd
from haymaker import (
aggregators,
app,
base,
bracket_legs,
brick,
execution_models,
indicators,
portfolio,
signals,
streamers,
)
@dataclass
class EMACrossStrategy(brick.AbstractDfBrick):
strategy: str
contract: ibi.Contract
fast_lookback: int
slow_lookback: int
atr_lookback: int
def df(self, df: pd.DataFrame) -> pd.DataFrame:
df["fast_ema"] = df["close"].ewm(self.fast_lookback).mean()
df["slow_ema"] = df["close"].ewm(self.slow_lookback).mean()
df["signal"] = np.sign(df["fast_ema"] - df["slow_ema"])
df["atr"] = indicators.atr(df, self.atr_lookback)
return df
es_contract = ibi.ContFuture("ES", "CME")
portfolio.FixedPortfolio(1)
pipe = base.Pipe(
streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hours", "TRADES"),
aggregators.BarAggregator(aggregators.NoFilter()),
EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24),
signals.BinarySignalProcessor(),
portfolio.PortfolioWrapper(),
execution_models.EventDrivenExecModel(
stop=bracket_legs.TrailingStop(3, vol_field="atr")
),
)
if __name__ == "__main__":
app.App().run()
Warning
NOT INVESTMENT ADVICE
This example is only meant to illustrate how to use the Haymaker framework. It is unlikely to produce favorable investment outcomes.
Example Walk-Through¶
This is a simple example implementing a moving average crossover strategy with a stop-loss.
The strategy:
Buys 1 e-mini S&P futures contract (‘ES’) whenever the faster exponential moving average (EMA) crosses above the slower one.
Sells 1 contract when the faster EMA crosses below the slower EMA.
The moment a position-opening order is filled, places a trailing stop-loss order with a distance based on the current instrument’s Average True Range.
Whenever the stop-loss is hit, prevents reopening a position in the same direction until an opposite position is opened and closed. This protects against repeated transactions in a volatile, non-trending market.
Reverses the position when a signal indicates a direction opposite to the position currently held.
from dataclasses import dataclass
import pandas as pd
import numpy as np
from haymaker import brick, indicators
import ib_insync as ibi
@dataclass
class EMACrossStrategy(brick.AbstractDfBrick):
strategy: str
contract: ibi.Contract
fast_lookback: int
slow_lookback: int
atr_lookback: int
def df(self, df: pd.DataFrame) -> pd.DataFrame:
df["fast_ema"] = df["close"].ewm(self.fast_lookback).mean()
df["slow_ema"] = df["close"].ewm(self.slow_lookback).mean()
df["signal"] = np.sign(df["fast_ema"] - df["slow_ema"])
df["atr"] = indicators.atr(df, self.atr_lookback)
return df
This defines the trading signals using haymaker.brick.AbstractDfBrick. It requires a dataclasses.dataclass with the strategy name, contract, and parameters. The haymaker.brick.AbstractDfBrick.df() method must be overridden to process a pandas.DataFrame containing market data (e.g., Open, High, Low, Close, Volume, AveragePrice), depending on the connected streamers and processors.
The data received via onData is wrapped into a pandas.DataFrame with column names matching the keys in the data dictionary. Users must ensure upstream components provide all required data for signal generation.
The haymaker.brick.AbstractDfBrick.df() method must return a pandas.DataFrame with a signal column: 1 for long, 0 for no position, -1 for short. Additional columns (e.g., atr) can be included for downstream components.
es_contract = ibi.ContFuture("ES", "CME")
The ib_insync.contracts.ContFuture contract is not directly tradable. Haymaker replaces it with the current on-the-run futures contract and rolls it to the next contract near expiration. Refer to other documentation sections for customization details.
from haymaker import portfolio
portfolio.FixedPortfolio(1)
Typically, a haymaker.portfolio.FixedPortfolio would include more logic. Here, it trades one contract regardless of circumstances—a simplistic approach not recommended for real use.
from haymaker import base, streamers, aggregators, signals, execution_models, portfolio, bracket_legs
pipe = base.Pipe(
streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hour", "TRADES"),
aggregators.BarAggregator(aggregators.NoFilter()),
EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24),
signals.BinarySignalProcessor(),
portfolio.PortfolioWrapper(),
execution_models.EventDrivenExecModel(
stop=bracket_legs.TrailingStop(3, vol_field="atr")
),
)
The haymaker.base.Pipe connects components into an event-driven pipeline. Market data from the streamer triggers processing, potentially resulting in broker orders.
Components used:
- Historical data streamer¶
streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hour", "TRADES")
Pulls 10 days of 1-hour “TRADES” data for
es_contractfrom the broker, updating with new data points. The framework automatically fills gaps if disruptions occur. - No-op aggregator¶
aggregators.BarAggregator(aggregators.NoFilter())
No aggregation or processing is applied (using
haymaker.aggregators.NoFilter). An aggregator is currently required with historical data streamers to track history. - EMA crossover strategy instance¶
EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24)
Instantiates the strategy with arbitrary parameters: 12-hour and 48-hour EMAs, 24-hour ATR.
Note
These parameters are illustrative and not optimized.
- Binary signal processor¶
signals.BinarySignalProcessor()
The
haymaker.signals.BinarySignalProcessorensures: * Repeated signals in the same direction are ignored if a position exists. * Post-stop-loss, prevents reopening in the same direction until an opposite position is cycled. * Reverses positions on opposing signals. - Portfolio wrapper¶
portfolio.PortfolioWrapper()
Connects to a single global
haymaker.portfolio.FixedPortfolioinstance, ensuring strategy-specific data flows correctly to downstream components. - Event-driven execution model¶
execution_models.EventDrivenExecModel( stop=bracket_legs.TrailingStop(3, vol_field="atr") )
Sends a
ib_insync.order.MarketOrderto the broker. Upon fill, places a trailing stop-loss order with a distance of 3×ATR (requires anatrcolumn from thehaymaker.brick.AbstractDfBrick).
from haymaker import app
if __name__ == "__main__":
app.App().run()
Run this as a script (e.g., strategy.py):
python strategy.py
Ensure python is the correct interpreter. Long-running scripts should be managed as processes (see process management documentation—link TBD).
Conclusion¶
In real-world strategies, you’d trade multiple instruments and parameter sets. Using the patterns above with Python data structures, you can create pipelines for as many combinations as needed.