Execution Module

Haymaker trading algorithms consist of a series of Atom components, each implementing a step in a trading algorithm. These components are piped together in an event-driven fashion.

Common trading steps include:

  • Receiving price data.

  • Processing, aggregating, or filtering data.

  • Generating trading signals.

  • Managing the portfolio.

  • Controlling risk.

  • Managing execution.

Each processing component (called an “Atom”) inherits from haymaker.base.Atom.

Atom Object

class haymaker.base.Atom[source]

Abstract base object from which all other objects inherit. It’s a basic building block for creating strategies in Haymaker. Every Atom represents a processing step typically for one traded instrument, thus allowing for separation of concerns. Connecting Atoms creates a processing pipeline.

Atoms in a pipeline communicate with each other in an event-driven manner through three methods: onStart(), onData(), onFeedback(). Those methods are called when appropriate eventkit.event.Event objects are emitted (respectively startEvent, dataEvent, feedbackEvent), and emit their own events when they are done processing thus sending a signal to the next Atom in the pipeline that it can start processing.

Users are free to put in those methods any processing logic they want, using any libraries and tools required. Atoms can be connected in any order; unions can be created by connecting more than one Atom; pipelines can be created using auxiliary class Pipeline.

contract

The contract object associated with this Atom. This can be any ib_insync.contract.Contract. On startup this contract will be qualified and available ib_insync.contract.ContractDetails will be downloaded from broker and made available through details attribute. If contract is a ib_insync.contract.ContFuture or ib_insync.contract.Future, it will be replaced with on-the-run ib_insync.contract.Future. ContFuture will pick contract that IB considers to be be current, Future allows for customization by tweaking FutureSelector. Whichever method is chose, when contract to be rolled, onContractChanged() method will be called.

This attribute doesn’t need to be set. If this Atom object is not related to any one particular contract, just don’t assign any value to this attribute.

Type:

ib_insync.contract.Contract

which_contract

default: ACTIVE; if NEXT chosen contract will return next contract in chain (relevant only for expiring contracts like futures or options) allowing for early usage of upcoming contracts for new positions a short period before they become active (number of days prior to expiry during which NEXT will be used can be configured in config.)

Type:

ActiveNext

ib

The instance of the ib_insync.ib.IB client used for interacting with the broker. It can be used to communicate with the broker if neccessary.

Type:

ClassVar[ibi.IB]

sm

Access to StateMachine which is Haymaker’s central collection of information about current positions, orders and state of strategies.

Type:

ClassVar[StateMachine]

contracts

A collection of all contracts currently in use.

Type:

ClassVar[list[ibi.Contract]]

events

Collection of eventkit.Event objects used by Haymaker, i.e. startEvent, dataEvent, feedbackEvent, appropriate methods should use these events to communicate with other objects in the chain, e.g. onStart() after processing incoming data should pass the result to the next object by calling self.dataEvent.emit(data).

Type:

ClassVar[Sequence[str]]

connect(*targets)[source]

Connect appropriate events and methods to subsequent Atom object(s) in the chain. Shorthand for this method is +=

Parameters:

targets (Atom) – One or more Atom objects to connect to. If more than one object passed, they will be connected directly in a one-to-many fashion. If the intention is to create a chain of objects, use pipe() instead.

Returns:

The updated Atom object.

Return type:

Atom

property details: Details | list[Details] | DetailsContainer

Contract details received from the broker.

If contract is set, details will be returned only for this contract, otherwise details will return a dictionary of all available details, ie. dict`[:class:`ibi.Contract, Details]

Details is a wrapper around ib_insync.contract.ContractDetails with some additional methods and attributes.

disconnect(*targets)[source]

Disconnect passed Atom objects, which are directly connected to this atom. Shorthand for this method is -=

Parameters:

targets (Atom) – One or more Atom objects to disconnect from.

Return type:

Self

onContractChanged(old_contract, new_contract)[source]

Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.

Return type:

Optional[Awaitable[None]]

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

Optional[Awaitable[None]]

onFeedback(data, *args)[source]

Connected to feedbackEvent of the subsequent object in the chain. Allows for passing of information about trading results. It’s optional to use it, if used, overriden method must emit feedbackEvent with appropriate data. If not overriden, it will just pass received data to the previous object in the chain.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

Optional[Awaitable[None]]

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

pipe(*targets)[source]

Create a Pipe or a chain of connected Atom objects. Only first target will be directly connected to this object, second target will be connected to the first target and so on. It’s different from connect() method, which connects all targets directly to this object.

Returns:

Pipe object with all targets connected in a chain, where this object is the first and the last target is the last target in the list of passed targets.

Return type:

Pipe

Auxiliary Objects

class haymaker.base.Pipe(*targets)[source]

Auxiliary object for conneting several Atom objects. Atoms to be connected need to passed in the right order at initialization. Pipe itself is a subclass of Atom, so it can be connected to other Atom (or Pipe) and all Atom attributes and methods are available.

connect(*targets)[source]

Connect appropriate events and methods to subsequent Atom object(s) in the chain. Shorthand for this method is +=

Parameters:

targets (Atom) – One or more Atom objects to connect to. If more than one object passed, they will be connected directly in a one-to-many fashion. If the intention is to create a chain of objects, use pipe() instead.

Returns:

The updated Atom object.

Return type:

Atom

disconnect(*targets)[source]

Disconnect passed Atom objects, which are directly connected to this atom. Shorthand for this method is -=

Parameters:

targets (Atom) – One or more Atom objects to disconnect from.

Return type:

Self

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

onFeedback(data, *args)[source]

Connected to feedbackEvent of the subsequent object in the chain. Allows for passing of information about trading results. It’s optional to use it, if used, overriden method must emit feedbackEvent with appropriate data. If not overriden, it will just pass received data to the previous object in the chain.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

class haymaker.base.Details(details)[source]

Wrapper object for ib_insync.contract.ContractDetails extracting and processing information that’s most relevant for Haymaker.

details

The original contract details object.

Type:

ibi.ContractDetails

trading_hours

List of tuples with start and end of trading hours for this contract.

Type:

list[tuple[datetime, datetime]]

liquid_hours

List of tuples with start and end of liquid hours for this contract.

Type:

list[tuple[datetime, datetime]]

is_liquid(_now=None)[source]

Given current time check if the market is during liquid hours for underlying contract.

Parameters:
  • _now (Optional[datetime], optional) – . Defaults to None.

  • when (If not provided current time will be used. Only situation)

  • testing. (it's useful to provide _now is in)

Returns:

True if market is liquid, False otherwise.

Return type:

bool

is_open(_now=None)[source]

Given current time check if the market is open for underlying contract.

Parameters:
  • _now (Optional[datetime], optional) – Defaults to None.

  • when (If not provided current time will be used. Only situation)

  • testing. (it's useful to provide _now is in)

Returns:

True if market is open, False otherwise.

Return type:

bool

next_open(_now=None)[source]

Return time of nearest market re-open (regardless if market is open now). Should be used after it has been tested that is_active() is False.

Parameters:
  • _now (Optional[datetime], optional) – Defaults to None.

  • when (If not provided current time will be used. Only situation)

  • testing. (it's useful to provide _now is in)

Return type:

datetime | None

Strategy Building Components

Building on haymaker.base.Atom, Haymaker offers skeletons of several components addressing typical requirements in building trading strategies:

  • Streamer: Connects to the broker and pipes market data.

  • Aggregator: Custom aggregation or processing of market data before signal generation.

  • Brick: Generates trading signals; this is the core building block of strategies (like bricks in a house).

    Note

    If you have an idea for a better name, email me! :)

  • Signal Processor: Filters or processes signals based on strategy state or auxiliary data, determining whether signals should trigger orders.

  • Portfolio: A global object that receives processed signals and translates them into allocations (e.g., amounts of instruments to trade). It uses data like account value, holdings, volatility, risk targets, and concentration limits.

  • Execution Model: Issues actual broker orders based on target instrument amounts.

Below is a review of how to use pre-built component modules:

Streamer

Haymaker provides streamers corresponding to all ib_insync market data subscriptions:

ib_insync Method

Streamer

reqHistoricalDataAsync

HistoricalDataStreamer

reqMktData

MktDataStreamer

reqRealTimeBars

RealTimeBarsStreamer

reqTickByTickData

TickByTickStreamer

All streamers extend haymaker.streamers.Streamer.

class haymaker.streamers.Streamer[source]
classmethod awaitables()[source]

Coroutines from all instantiated streamers. Can be passed to asyncio.gather()

Return type:

list[Awaitable]

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

async run()[source]

Start subscription and start emitting data. This is the main entry point into the streamer.

Return type:

None

Implementations

Every implementation accepts the same arguments as the respective ib_insync method it wraps, plus standard haymaker.streamers.Streamer parameters.

class haymaker.streamers.HistoricalDataStreamer(contract, durationStr, barSizeSetting, whatToShow, useRTH=False, formatDate=2, incremental_only=True, startup_seconds=5, _last_bar_date=None, _future_adjust_flag=False, _adjusted=<factory>)[source]
date_to_delta(date)[source]

Return number of bars (as per barSizeSetting) since date. Used to determine number of bars required to backfill since last reset.

Return type:

int

onContractChanged(old_contract, new_contract)[source]

Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

async run()[source]

Start subscription and start emitting data. This is the main entry point into the streamer.

Return type:

None

class haymaker.streamers.MktDataStreamer(contract, tickList)[source]
class haymaker.streamers.RealTimeBarsStreamer(contract, whatToShow, useRTH, incremental_only=True)[source]
onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

class haymaker.streamers.TickByTickStreamer(contract, tickType, numberOfTicks=0, ignoreSize=False)[source]

Aggregator

class haymaker.aggregators.BarAggregator(filter, incremental_only=False, future_adjust_type='add', debug=False)[source]

Aggregate recieved data bars into new bars based on the criteria specified in :attr:`filter’. Store processed data.

adjust_future(new_bar)[source]

Create continuous future price series on future contract change. IB mechanism cannot be trusted. This feature can be turned off by passing future_adjust_type=None while initiating the class.

onData(data, *args)[source]

This passes correct data to self._filter.

Return type:

None

onDataBar(bars, *args)[source]

This is connected to self._filter, will emit whatever comes from filter.

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Available Filters

class haymaker.aggregators.CountBars(count, source=None, *, label='')[source]
on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.VolumeBars(volume, source=None, *, label='')[source]
on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.TickBars(count, source=None, *, label='')[source]
on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.TimeBars(timer, source=None, *, label='')[source]
on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.NoFilter(source=None, *, label='')[source]

This works as an accumulator making sure that no bars are lost during restarts.

on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

Brick

class haymaker.brick.AbstractBaseBrick(strategy, contract)[source]
onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Implementations

class haymaker.brick.AbstractDfBrick(strategy, contract)[source]

Override haymaker.brick.AbstractDfBrick.df() when creating a concrete AbstractDfBrick.

Signal Processor

To better separate concerns, filter signals received from haymaker.brick.AbstractBaseBrick based on strategy state (e.g., avoid repeated signals or re-entering after a stop-out). While this could be done in haymaker.brick.AbstractBaseBrick, using a haymaker.signals.BinarySignalProcessor is more modular.

Available processors are designed for binary signals (on/off switches). For discrete signals (e.g., 0–10 strength), these implementations aren’t suitable, but you can develop custom ones.

It’s easiest to create a binary signal processor by implementing haymaker.signals.AbstractBaseBinarySignalProcessor.

class haymaker.signals.AbstractBaseBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]

Process binary signals, i.e. long/short/off, as opposed to descrete signals, where signal strength is meaningful (e.g. signal assuming values -10…10 based on strength of conviction).

Actual position size or even whether the position should be taken at all is not determined here, it’s the job of Portfolio.

Actual meaning of signals is defined in sub-classes, by overriding methods: process_position() and process_no_position().

Whatever the meaning of the signal coming in, signal coming out means strategy wants to take action in the direction of signal, as indicated by keys action and signal in the emitted dict. Incoming signals that don’t require any action will be stopped here and not propagated down the chain.

In sub-class names blip means zero signal should be ignored, othewise absence of signal means there should be no position.

Args:

signal_fields - if str, single field of this name is used as signal in (open position) and signal out (close position), if tuple then first element is signal in and second is signal out

state_machine - this is for testing only and should not be passed in non-testing environment

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

position(strategy)[source]

Which side of the market is position on: (short: -1, long: 1, no position: 0)

Return type:

Literal[-1, 0, 1]

same_direction(strategy, signal)[source]

Is signal and position in the same direction?

Return type:

bool

Implementations

class haymaker.signals.BinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

class haymaker.signals.BlipBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Zero signal means do nothing

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

class haymaker.signals.LockableBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Signals in the direction of last position are ignored (one side

of the market is ‘locked’). It’s up to StateMachine to determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).

  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

class haymaker.signals.LockableBlipBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Signals in the direction of last position are ignored (one side

of the market is ‘locked’). It’s up to StateMachine to determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).

  • Zero signal means do nothing

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

class haymaker.signals.AlwaysOnLockableBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Signals in the direction of last position are ignored (one side

of the market is ‘locked’). It’s up to StateMachine to determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).

  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** reverse position if the signal is in the direction opposite to existing position

class haymaker.signals.AlwaysOnBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

Factory Function

haymaker.signals.binary_signal_processor_factory(lockable=False, always_on=False)[source]

Helper function to return appropriate class based on parameters.

Parameters:
  • lockable – True - no signals in the direction of last position if the last position was stopped out (allowed, if position was closed through means other than stop-loss)

  • always_on – True - ‘CLOSE’ signal also opens position in reverese direction

Return type:

Type[AbstractBaseBinarySignalProcessor]

Portfolio

class haymaker.portfolio.AbstractBasePortfolio(*args, **kwargs)[source]

Decides what, if and how much to trade based on received signals and queries to [SM?].

Each strategy should have its own instance of portfolio to ensure that signals form various strategies should not be mixed-up. Actual singleton porfolio object should be passed to those instances, which should delegate allocation to this object.

abstractmethod allocate(data)[source]

Return desired position size in contracts. Interpretation of this number is up to execution model.

Return type:

float

Implementations

class haymaker.portfolio.FixedPortfolio(amount=1)[source]
allocate(data)[source]

Return desired position size in contracts. Interpretation of this number is up to execution model.

Return type:

float

Wrapper

class haymaker.portfolio.PortfolioWrapper[source]
onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

An instance of haymaker.portfolio.AbstractBasePortfolio should never be directly included in a processing pipeline, as there should be only one portfolio for all strategies. Instead, include an instance of haymaker.portfolio.PortfolioWrapper in a pipeline. As long as an instance of haymaker.portfolio.AbstractBasePortfolio exists in your package, haymaker.portfolio.PortfolioWrapper ensures it’s connected.

Execution Model

It’s easiest to create an execution model by extending haymaker.execution_models.AbstractExecModel.

class haymaker.execution_models.AbstractExecModel(*, open_order={}, close_order={}, controller=None)[source]

Intermediary between Portfolio and Trader. It translates strategy signals into orders acceptable by Interactive Brokers. May also implement market execution strategy, monitor post-order events or manage any other function related to order execution.

abstractmethod onData(data, *args)[source]

Must use self.trade`(:class:`ibi.Contract(), ibi.Order, strategy_key, reason) to send orders for execution, and subsequently link any ibi.Trade events returned by self.trade() to required callbacks.

While openning position must set self.contract to ibi.Contract that has been used.

Must keep track of current position in the market by updating self.data.position.

Args:

data (dict): is a dict created by Brick, updated by Portfolio, which must contain all parameters required to execute transactions in line with this execution model.

Returns:

:

(trade, note), where:

  • trade: ibi.Trade object for the issued order *

note: info string for loggers and blotters about the character of the transaction (open, close, stop, etc.)

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

Implementations

class haymaker.execution_models.BaseExecModel(*, open_order={}, close_order={}, controller=None)[source]

Bases: AbstractExecModel

Orders generated based on data sent to onData(), of which following keys are required:

  • action: must be one of: OPEN, CLOSE, REVERSE

  • signal determines transaction direction, must be one of {-1, 1} for sell/buy respectively; irrelevant for CLOSE, where direction is determined by current position

  • contract - this ibi.Contract instance will be traded

  • amount - quantity of ``contract``s that will be traded

  • target_position - one of {-1, 0, 1} determining direction AFTER transaction is executed; will be used by Controller to verify if the transaction’s effect was as desired

Enters and closes positions based on params sent to onData(). Orders composed by open() and close(), which can be overridden or extended in subclasses to get more complex behaviour.

onData(data, *args)[source]

Must use self.trade`(:class:`ibi.Contract(), ibi.Order, strategy_key, reason) to send orders for execution, and subsequently link any ibi.Trade events returned by self.trade() to required callbacks.

While openning position must set self.contract to ibi.Contract that has been used.

Must keep track of current position in the market by updating self.data.position.

Args:

data (dict): is a dict created by Brick, updated by Portfolio, which must contain all parameters required to execute transactions in line with this execution model.

Returns:

:

(trade, note), where:

  • trade: ibi.Trade object for the issued order *

note: info string for loggers and blotters about the character of the transaction (open, close, stop, etc.)

class haymaker.execution_models.EventDrivenExecModel(*, open_order={}, close_order={}, stop_order={}, tp_order={}, stop=None, take_profit=None, oca_type=2, controller=None)[source]

Bases: BaseExecModel

Use events to attach stop-loss and optional take-profit orders after execution of open order. After close transaction remove existing bracketing orders.

Parameters (keyword only):
open_order, close_order, stop_order, tp_order: dict, optional

maybe used to define order type used for respective transactions; passed dicts will be used to extend and/or update any defaults from config

stop: AbstractBracketLeg instance, must be provided

manages creation of stop-loss order

take_profit: AbstractBracketLeg instance, optional

manages creation of take-profit order

oca_type: int, default 1

OCA group type as per Interactive Brokers definition

controller: Controller instance, optional

passing Controller is meant for testing; otherwise the system should be allowed to use its own mechanisms to create it

close(data, dynamic_order_kwargs=None)[source]

On top of actions perfomed by base class, this method will: attach oca that will cancel any brackets after order execution.

Return type:

Trade | None

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict and all keys on this dict are being set as properties on the object. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should be the last line in overriden method; don’t manually emit startEvent in subclass.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

open(data, dynamic_order_kwargs=None)[source]

On top of actions perfomed by base class, this method will: save information required for bracket orders and attach events that will attach brackets after order completion.

Return type:

Trade | None

This model automatically places stop-loss orders (and potentially take-profit orders) when the original order is filled.

Example Usage

from dataclasses import dataclass

import ib_insync as ibi
import numpy as np
import pandas as pd

from haymaker import (
    aggregators,
    app,
    base,
    bracket_legs,
    brick,
    execution_models,
    indicators,
    portfolio,
    signals,
    streamers,
)


@dataclass
class EMACrossStrategy(brick.AbstractDfBrick):
    strategy: str
    contract: ibi.Contract
    fast_lookback: int
    slow_lookback: int
    atr_lookback: int

    def df(self, df: pd.DataFrame) -> pd.DataFrame:
        df["fast_ema"] = df["close"].ewm(self.fast_lookback).mean()
        df["slow_ema"] = df["close"].ewm(self.slow_lookback).mean()
        df["signal"] = np.sign(df["fast_ema"] - df["slow_ema"])
        df["atr"] = indicators.atr(df, self.atr_lookback)
        return df


es_contract = ibi.ContFuture("ES", "CME")

portfolio.FixedPortfolio(1)

pipe = base.Pipe(
    streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hours", "TRADES"),
    aggregators.BarAggregator(aggregators.NoFilter()),
    EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24),
    signals.BinarySignalProcessor(),
    portfolio.PortfolioWrapper(),
    execution_models.EventDrivenExecModel(
        stop=bracket_legs.TrailingStop(3, vol_field="atr")
    ),
)

if __name__ == "__main__":
    app.App().run()

Warning

NOT INVESTMENT ADVICE

This example is only meant to illustrate how to use the Haymaker framework. It is unlikely to produce favorable investment outcomes.

Example Walk-Through

This is a simple example implementing a moving average crossover strategy with a stop-loss.

The strategy:

  • Buys 1 e-mini S&P futures contract (‘ES’) whenever the faster exponential moving average (EMA) crosses above the slower one.

  • Sells 1 contract when the faster EMA crosses below the slower EMA.

  • The moment a position-opening order is filled, places a trailing stop-loss order with a distance based on the current instrument’s Average True Range.

  • Whenever the stop-loss is hit, prevents reopening a position in the same direction until an opposite position is opened and closed. This protects against repeated transactions in a volatile, non-trending market.

  • Reverses the position when a signal indicates a direction opposite to the position currently held.

Defining the EMA crossover strategy
from dataclasses import dataclass
import pandas as pd
import numpy as np
from haymaker import brick, indicators
import ib_insync as ibi

@dataclass
class EMACrossStrategy(brick.AbstractDfBrick):
    strategy: str
    contract: ibi.Contract
    fast_lookback: int
    slow_lookback: int
    atr_lookback: int

    def df(self, df: pd.DataFrame) -> pd.DataFrame:
        df["fast_ema"] = df["close"].ewm(self.fast_lookback).mean()
        df["slow_ema"] = df["close"].ewm(self.slow_lookback).mean()
        df["signal"] = np.sign(df["fast_ema"] - df["slow_ema"])
        df["atr"] = indicators.atr(df, self.atr_lookback)
        return df

This defines the trading signals using haymaker.brick.AbstractDfBrick. It requires a dataclasses.dataclass with the strategy name, contract, and parameters. The haymaker.brick.AbstractDfBrick.df() method must be overridden to process a pandas.DataFrame containing market data (e.g., Open, High, Low, Close, Volume, AveragePrice), depending on the connected streamers and processors.

The data received via onData is wrapped into a pandas.DataFrame with column names matching the keys in the data dictionary. Users must ensure upstream components provide all required data for signal generation.

The haymaker.brick.AbstractDfBrick.df() method must return a pandas.DataFrame with a signal column: 1 for long, 0 for no position, -1 for short. Additional columns (e.g., atr) can be included for downstream components.

Defining the ES futures contract
es_contract = ibi.ContFuture("ES", "CME")

The ib_insync.contracts.ContFuture contract is not directly tradable. Haymaker replaces it with the current on-the-run futures contract and rolls it to the next contract near expiration. Refer to other documentation sections for customization details.

Setting a fixed portfolio size
from haymaker import portfolio
portfolio.FixedPortfolio(1)

Typically, a haymaker.portfolio.FixedPortfolio would include more logic. Here, it trades one contract regardless of circumstances—a simplistic approach not recommended for real use.

Assembling the pipeline
from haymaker import base, streamers, aggregators, signals, execution_models, portfolio, bracket_legs

pipe = base.Pipe(
    streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hour", "TRADES"),
    aggregators.BarAggregator(aggregators.NoFilter()),
    EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24),
    signals.BinarySignalProcessor(),
    portfolio.PortfolioWrapper(),
    execution_models.EventDrivenExecModel(
        stop=bracket_legs.TrailingStop(3, vol_field="atr")
    ),
)

The haymaker.base.Pipe connects components into an event-driven pipeline. Market data from the streamer triggers processing, potentially resulting in broker orders.

Components used:

  • Historical data streamer
    streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hour", "TRADES")
    

    Pulls 10 days of 1-hour “TRADES” data for es_contract from the broker, updating with new data points. The framework automatically fills gaps if disruptions occur.

  • No-op aggregator
    aggregators.BarAggregator(aggregators.NoFilter())
    

    No aggregation or processing is applied (using haymaker.aggregators.NoFilter). An aggregator is currently required with historical data streamers to track history.

  • EMA crossover strategy instance
    EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24)
    

    Instantiates the strategy with arbitrary parameters: 12-hour and 48-hour EMAs, 24-hour ATR.

    Note

    These parameters are illustrative and not optimized.

  • Binary signal processor
    signals.BinarySignalProcessor()
    

    The haymaker.signals.BinarySignalProcessor ensures: * Repeated signals in the same direction are ignored if a position exists. * Post-stop-loss, prevents reopening in the same direction until an opposite position is cycled. * Reverses positions on opposing signals.

  • Portfolio wrapper
    portfolio.PortfolioWrapper()
    

    Connects to a single global haymaker.portfolio.FixedPortfolio instance, ensuring strategy-specific data flows correctly to downstream components.

  • Event-driven execution model
    execution_models.EventDrivenExecModel(
        stop=bracket_legs.TrailingStop(3, vol_field="atr")
    )
    

    Sends a ib_insync.order.MarketOrder to the broker. Upon fill, places a trailing stop-loss order with a distance of 3×ATR (requires an atr column from the haymaker.brick.AbstractDfBrick).

Running the application
from haymaker import app

if __name__ == "__main__":
    app.App().run()

Run this as a script (e.g., strategy.py):

Starting the strategy
python strategy.py

Ensure python is the correct interpreter. Long-running scripts should be managed as processes (see process management documentation—link TBD).

Conclusion

In real-world strategies, you’d trade multiple instruments and parameter sets. Using the patterns above with Python data structures, you can create pipelines for as many combinations as needed.