Execution Module

Haymaker trading algorithms consist of a series of Atom components, each implementing a step in a trading algorithm. These components are piped together in an event-driven fashion.

Common trading steps include:

  • Receiving price data.

  • Processing, aggregating, or filtering data.

  • Generating trading signals.

  • Managing the portfolio.

  • Controlling risk.

  • Managing execution.

Each processing component (called an “Atom”) inherits from haymaker.base.Atom.

Atom Object

class haymaker.base.Atom[source]

Abstract base object from which all other objects inherit. It’s a basic building block for creating strategies in Haymaker. Every Atom represents a processing step typically for one traded instrument, thus allowing for separation of concerns. Connecting Atoms creates a processing pipeline.

Atoms in a pipeline communicate with each other in an event-driven manner through three methods: onStart(), onData(), onFeedback(). Those methods are called when appropriate eventkit.event.Event objects are emitted (respectively startEvent, dataEvent, feedbackEvent), and emit their own events when they are done processing thus sending a signal to the next Atom in the pipeline that it can start processing.

Users are free to put in those methods any processing logic they want, using any libraries and tools required. Atoms can be connected in any order; unions can be created by connecting more than one Atom; pipelines can be created using auxiliary class Pipeline.

contract

The contract object associated with this Atom. This can be any ib_insync.contract.Contract. On startup this contract will be qualified and available ib_insync.contract.ContractDetails will be downloaded from broker and made available through details attribute. If contract is a ib_insync.contract.ContFuture or ib_insync.contract.Future, it will be replaced with on-the-run ib_insync.contract.Future. ContFuture will pick contract that IB considers to be be current, Future allows for customization by tweaking FutureSelector. Whichever method is chose, when contract to be rolled, onContractChanged() method will be called.

This attribute doesn’t need to be set. If this Atom object is not related to any one particular contract, just don’t assign any value to this attribute.

Type:

ib_insync.contract.Contract

which_contract

default: ACTIVE; if NEXT chosen contract will return next contract in chain (relevant only for expiring contracts like futures or options) allowing for early usage of upcoming contracts for new positions a short period before they become active (number of days prior to expiry during which NEXT will be used can be configured in config.)

Type:

ActiveNext

ib

The instance of the ib_insync.ib.IB client used for interacting with the broker. It can be used to communicate with the broker if neccessary.

Type:

ClassVar[ibi.IB]

sm

Access to StateMachine which is Haymaker’s central collection of information about current positions, orders and state of strategies.

Type:

ClassVar[StateMachine]

contracts

A collection of all contracts currently in use.

Type:

ClassVar[list[ibi.Contract]]

events

Collection of eventkit.Event objects used by Haymaker, i.e. startEvent, dataEvent, feedbackEvent, appropriate methods should use these events to communicate with other objects in the chain, e.g. onStart() after processing incoming data should pass the result to the next object by calling self.dataEvent.emit(data).

Type:

ClassVar[Sequence[str]]

connect(*targets)[source]

Connect appropriate events and methods to subsequent Atom object(s) in the chain. Shorthand for this method is +=

Parameters:

targets (Atom) – One or more Atom objects to connect to. If more than one object passed, they will be connected directly in a one-to-many fashion. If the intention is to create a chain of objects, use pipe() instead.

Returns:

The updated Atom object.

Return type:

Atom

property contract_details: Details

Contract details received from the broker.

if contract is not set empty Details object will be returned.

property data: Strategy

Return strategy data if strategy has been set

property details: Details

This is a deprecated property name, which will be removed

disconnect(*targets)[source]

Disconnect passed Atom objects, which are directly connected to this atom. Shorthand for this method is -=

Parameters:

targets (Atom) – One or more Atom objects to disconnect from.

Return type:

Self

onContractChanged(old_contract, new_contract)[source]

Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.

Return type:

Optional[Awaitable[None]]

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

Optional[Awaitable[None]]

onFeedback(data, *args)[source]

Connected to feedbackEvent of the subsequent object in the chain. Allows for passing of information about trading results. It’s optional to use it, if used, overriden method must emit feedbackEvent with appropriate data. If not overriden, it will just pass received data to the previous object in the chain.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

Optional[Awaitable[None]]

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should usually be the last line in overriden method as it will emit startEvent - basically do any processing required in sub-class and then call super-class, which will do standard processing and then emit the event to initialize processing in the next Atom down the chain; if you don’t call superclass, make sure to emit startEvent, otherwise subsequent Atoms in the chain won’t do startup initialization.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

Optional[Awaitable[None]]

pipe(*targets)[source]

Create a Pipe or a chain of connected Atom objects. Only first target will be directly connected to this object, second target will be connected to the first target and so on. It’s different from connect() method, which connects all targets directly to this object.

Returns:

Pipe object with all targets connected in a chain, where this object is the first and the last target is the last target in the list of passed targets.

Return type:

Pipe

Auxiliary Objects

class haymaker.base.Pipe(*targets)[source]

Auxiliary object for conneting several Atom objects. Atoms to be connected need to passed in the right order at initialization. Pipe itself is a subclass of Atom, so it can be connected to other Atom (or Pipe) and all Atom attributes and methods are available.

connect(*targets)[source]

Connect appropriate events and methods to subsequent Atom object(s) in the chain. Shorthand for this method is +=

Parameters:

targets (Atom) – One or more Atom objects to connect to. If more than one object passed, they will be connected directly in a one-to-many fashion. If the intention is to create a chain of objects, use pipe() instead.

Returns:

The updated Atom object.

Return type:

Atom

disconnect(*targets)[source]

Disconnect passed Atom objects, which are directly connected to this atom. Shorthand for this method is -=

Parameters:

targets (Atom) – One or more Atom objects to disconnect from.

Return type:

Self

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

onFeedback(data, *args)[source]

Connected to feedbackEvent of the subsequent object in the chain. Allows for passing of information about trading results. It’s optional to use it, if used, overriden method must emit feedbackEvent with appropriate data. If not overriden, it will just pass received data to the previous object in the chain.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should usually be the last line in overriden method as it will emit startEvent - basically do any processing required in sub-class and then call super-class, which will do standard processing and then emit the event to initialize processing in the next Atom down the chain; if you don’t call superclass, make sure to emit startEvent, otherwise subsequent Atoms in the chain won’t do startup initialization.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

class haymaker.base.Details(details)[source]

Wrapper object for ib_insync.contract.ContractDetails extracting and processing information that’s most relevant for Haymaker.

details

The original contract details object.

Type:

ibi.ContractDetails

trading_hours

List of tuples with start and end of trading hours for this contract.

Type:

list[tuple[datetime, datetime]]

liquid_hours

List of tuples with start and end of liquid hours for this contract.

Type:

list[tuple[datetime, datetime]]

is_liquid(_now=None)[source]

Given current time check if the market is during liquid hours for underlying contract.

Parameters:
  • _now (Optional[datetime], optional) – . Defaults to None.

  • when (If not provided current time will be used. Only situation)

  • testing. (it's useful to provide _now is in)

Returns:

True if market is liquid, False otherwise.

Return type:

bool

is_open(_now=None)[source]

Given current time check if the market is open for underlying contract.

Parameters:
  • _now (Optional[datetime], optional) – Defaults to None.

  • when (If not provided current time will be used. Only situation)

  • testing. (it's useful to provide _now is in)

Returns:

True if market is open, False otherwise.

Return type:

bool

next_open(_now=None)[source]

Return time of nearest market re-open (regardless if market is open now). Should be used after it has been tested that is_active() is False.

Parameters:
  • _now (Optional[datetime], optional) – Defaults to None.

  • when (If not provided current time will be used. Only situation)

  • testing. (it's useful to provide _now is in)

Return type:

datetime | None

property typical_close: tuple[int, int]

Return typical close time as a tuple of (hour, minute) in instrument’s timezone (not utc!).

Strategy Building Components

Building on haymaker.base.Atom, Haymaker offers skeletons of several components addressing typical requirements in building trading strategies:

  • Streamer: Connects to the broker and pipes market data.

  • Aggregator: Custom aggregation or processing of market data before signal generation.

  • Brick: Generates trading signals; this is the core building block of strategies (like bricks in a house).

    Note

    If you have an idea for a better name, email me! :)

  • Signal Processor: Filters or processes signals based on strategy state or auxiliary data, determining whether signals should trigger orders.

  • Portfolio: A global object that receives processed signals and translates them into allocations (e.g., amounts of instruments to trade). It uses data like account value, holdings, volatility, risk targets, and concentration limits.

  • Execution Model: Issues actual broker orders based on target instrument amounts.

Below is a review of how to use pre-built component modules:

Streamer

Haymaker provides streamers corresponding to all ib_insync market data subscriptions:

ib_insync Method

Streamer

reqHistoricalDataAsync

HistoricalDataStreamer

reqMktData

MktDataStreamer

reqRealTimeBars

RealTimeBarsStreamer

reqTickByTickData

TickByTickStreamer

All streamers extend haymaker.streamers.Streamer.

class haymaker.streamers.Streamer[source]
classmethod awaitables()[source]

Coroutines from all instantiated streamers. Can be passed to asyncio.gather()

Return type:

list[Awaitable]

async run()[source]

Start subscription and start emitting data. This is the main entry point into the streamer.

Return type:

None

Implementations

Every implementation accepts the same arguments as the respective ib_insync method it wraps, plus standard haymaker.streamers.Streamer parameters.

class haymaker.streamers.HistoricalDataStreamer(contract, durationStr, barSizeSetting, whatToShow, useRTH=False, formatDate=2, datastore=False, _last_bar_date=None)[source]

Stream historical and realtime bars as generated by ib_insync.IB.reqHistoricalDataAsync()

Due to specifics of IB API, bars will be streamed with ca. 5 seconds delay. If this is not suitable for your strategy, use a different streamer (most likely you need TickByTickStreamer).

Args:
  • contract: ibi.Contract

  • durationStr: str | int - can be given as either a string

acceptable by ib_insync.IB.reqHistoricalDataAsync() or number of historical bars required (which will internally be converted to appropriate duration str)

  • barSizeSetting: str

  • whatToShow: str

  • useRTH: bool = False

  • formatDate: int = 2 - keeping the default 2 ensures timestamps

will be returned as timeaware utc based; this is the only value that has been tested, other values may be incompatible with other framework components

  • datastore: bool | AsyncAbstractBaseStore = False

    ** if True, or a datastore is passed, last available datapoint

will be ready from database and only newer data will be requested;

** if False - no data will be read from datastore, only from

broker

Notes on the specifics of Interactive Brokers API:

The API broadcasts a snapshot of the database every 5 seconds.

  • Bars are timestamped at the start of the bar period

  • For bars with duration longer than 5 seconds:

    • last bar is always work in progress, it is still bound to be updated, it’s not suitable for usage yet

    • Updates happen every 5 seconds, hasNewBar will be True only 5 seconds (+processing time, typically ca. 1 sec) after new bar is ready

    • There is no way to receive new bars immediately when they’re created; it doesn’t matter how updateEvent is accessed

  • 5 second bars behave the same as RealTimeBars:

    • hasNewBar is always True

    • last bar is immediately ready (API update period is the same as bar lenght so no reason)

  • Bars shorter than 5 seconds are not allowed

async last_db_point()[source]

Return datetime for the last bar availble in the datastore for given contract.

start_date: how far back should available data be searched

Return type:

datetime | None

onContractChanged(old_contract, new_contract)[source]

Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.

Return type:

None

async run()[source]

Start subscription and start emitting data. This is the main entry point into the streamer.

Return type:

None

class haymaker.streamers.MktDataStreamer(contract, tickList)[source]

Stream market data.

class haymaker.streamers.RealTimeBarsStreamer(contract, whatToShow, useRTH, realTimeBarsOptions=<factory>)[source]

Stream realtime 5 seconds bars.

Notes on the specifics of Interactive Brokers API:
  • Bars are timestamped at the start of the bar period

  • The only possible bar interval is 5 seconds

  • Emits also happen every 5 seconds so hasNewBar is always True and the last bar is ready, i.e. it will not be adjusted subsequently

  • All bars including the last one are immediately usable

async run()[source]

The difference to superclass is that here we connect the updateEvent to intermediary function onUpdate, which needs to perform some additional checks before emitting dataEvent.

class haymaker.streamers.TickByTickStreamer(contract, tickType, numberOfTicks=0, ignoreSize=False)[source]

Stream tick by tick data.

Aggregator

class haymaker.aggregators.BarAggregator(filter, future_adjust_type='add')[source]

Aggregate recieved data bars into new bars based on the criteria specified in :attr:`filter’. Store processed data.

When future contract changes, data already in the filter will be adjusted. However, when system is started afresh right after contract changed, all back data will be for the new contract, which in some cases might be incorrect.

BarAggregator works best for strategies that don’t require large amounts of back data.

Args:

filter: one of CountBars, VolumeBars, TimeBars, NoFilter, determines how input bars are grouped into output bars

future_adjust_type: one of: “add” or “mul” on future contract change, how price bars currently in the filter are to be adjusted; resulting adjusted series will created by splicing two price series using either addition or multiplication.

adjust_future(bars)[source]

Create continuous future price series on future contract change.

Args:

bars: new price bars not currently included in self.filter() for current (post-roll) contract that the old series needs to be adjusted to

onContractChanged(old_contract, new_contract)[source]

Will be called if contract object on self.contract changes. In particular, this happens when future contract is about to expire, and new on-the-run contract replaces old, expiring contract. This method should be used to initialize any adjustment required on the object in relation to contract rolling. Actual position rolling is taken care of by Controller object.

Return type:

None

async onData(data, *args)[source]

The purpose of the queue is to avoid a race condition when onData receives several datapoints during a backfill and then has to process this data in the correct order when backfill is finished.

Return type:

None

onDataBar(bars, *args)[source]

This is connected to self.filter, will emit whatever comes from filter. Additional filtering/adjustment/conversion logic can be put here.

Backfill means this is stale data, it should not be emitted, because we don’t want this data to be treated as if it was current. It’s up to other system components to determine how we want to generate signals. However BarAggregator emits only when latest data becomes available and passes past data as such.

Return type:

None

onStart(data, *args)[source]

Syncing contract with streamer.

Return type:

None

Available Filters

class haymaker.aggregators.CountBars(count, source=None, *, label='')[source]

Group input bars into new bars corresponding to a fixed number of source bars.

Args:

count: number of source bars that constitue one output bar

on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.VolumeBars(volume, source=None, *, label='')[source]

Group input bars into new bars so that each output corresponds to the same volume.

Args:

volume: desired volume of each output bar

on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.TickBars(count, source=None, *, label='')[source]

Group input bars into new bars so that each output bar corresponds to the same number of ticks.

Args:

count: desired number of ticks for every output bar

on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.TimeBars(timer, source=None, *, label='')[source]

Group input bars into new bars so that every output bar corresponds to the same time period.

Args:

timer: eventkit.create.Timer corresponding to desired duration of output bars

on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

class haymaker.aggregators.NoFilter(source=None, *, label='')[source]

Accumulate input bars to ensure no bars are lost during restarts. Every input bar and output bar is the same.

on_source(new_bar, *args)[source]

Emit a new value to all connected listeners.

Parameters:

args – Argument values to emit to listeners.

Return type:

None

Brick

class haymaker.brick.AbstractBaseBrick(strategy, contract)[source]
onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should usually be the last line in overriden method as it will emit startEvent - basically do any processing required in sub-class and then call super-class, which will do standard processing and then emit the event to initialize processing in the next Atom down the chain; if you don’t call superclass, make sure to emit startEvent, otherwise subsequent Atoms in the chain won’t do startup initialization.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Implementations

class haymaker.brick.AbstractDfBrick(strategy, contract)[source]

Override haymaker.brick.AbstractDfBrick.df() when creating a concrete AbstractDfBrick.

Signal Processor

To better separate concerns, filter signals received from haymaker.brick.AbstractBaseBrick based on strategy state (e.g., avoid repeated signals or re-entering after a stop-out). While this could be done in haymaker.brick.AbstractBaseBrick, using a haymaker.signals.BinarySignalProcessor is more modular.

Available processors are designed for binary signals (on/off switches). For discrete signals (e.g., 0–10 strength), these implementations aren’t suitable, but you can develop custom ones.

It’s easiest to create a binary signal processor by implementing haymaker.signals.AbstractBaseBinarySignalProcessor.

class haymaker.signals.AbstractBaseBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]

Process binary signals, i.e. long/short/off, as opposed to descrete signals, where signal strength is meaningful (e.g. signal assuming values -10…10 based on strength of conviction).

Actual position size or even whether the position should be taken at all is not determined here, it’s the job of Portfolio.

Actual meaning of signals is defined in sub-classes, by overriding methods: process_position() and process_no_position().

Whatever the meaning of the signal coming in, signal coming out means strategy wants to take action in the direction of signal, as indicated by keys action and signal in the emitted dict. Incoming signals that don’t require any action will be stopped here and not propagated down the chain.

In sub-class names blip means zero signal should be ignored, othewise absence of signal means there should be no position.

Args:

signal_fields - if str, single field of this name is used as signal in (open position) and signal out (close position), if tuple then first element is signal in and second is signal out

state_machine - this is for testing only and should not be passed in non-testing environment

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

position(strategy)[source]

Which side of the market is position on: (short: -1, long: 1, no position: 0)

Return type:

Literal[-1, 0, 1]

same_direction(strategy, signal)[source]

Is signal and position in the same direction?

Return type:

bool

Implementations

class haymaker.signals.BinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

class haymaker.signals.BlipBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Zero signal means do nothing

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

class haymaker.signals.LockableBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Signals in the direction of last position are ignored (one side

of the market is ‘locked’). It’s up to StateMachine to determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).

  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

class haymaker.signals.LockableBlipBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Signals in the direction of last position are ignored (one side

of the market is ‘locked’). It’s up to StateMachine to determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).

  • Zero signal means do nothing

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

class haymaker.signals.AlwaysOnLockableBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Signals in the direction of last position are ignored (one side

of the market is ‘locked’). It’s up to StateMachine to determine which side is ‘locked’ based on position actually taken in the market (not just previously generated signals).

  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** reverse position if the signal is in the direction opposite to existing position

class haymaker.signals.AlwaysOnBinarySignalProcessor(signal_fields='signal', state_machine=None)[source]
  • Zero signal means close position if position exists

  • Non-zero signal means:

** open new position if there is no position for the strategy

** ignore signal if it’s in the same direction as existing position

** close position if the signal is in the direction opposite to existing position

Factory Function

haymaker.signals.binary_signal_processor_factory(lockable=False, always_on=False)[source]

Helper function to return appropriate class based on parameters.

Parameters:
  • lockable – True - no signals in the direction of last position if the last position was stopped out (allowed, if position was closed through means other than stop-loss)

  • always_on – True - ‘CLOSE’ signal also opens position in reverese direction

Return type:

Type[AbstractBaseBinarySignalProcessor]

Portfolio

class haymaker.portfolio.AbstractBasePortfolio(*args, **kwargs)[source]

Decides what, if and how much to trade based on received signals.

Each strategy should have its own instance of portfolio to ensure that signals form various strategies should not be mixed-up. Actual singleton porfolio object should be passed to those instances, which should delegate allocation to this object.

abstractmethod allocate(data)[source]

Return desired position size in contracts. Interpretation of this number is up to execution model.

Return type:

float

Implementations

class haymaker.portfolio.FixedPortfolio(amount=1)[source]
allocate(data)[source]

Return desired position size in contracts. Interpretation of this number is up to execution model.

Return type:

float

Wrapper

class haymaker.portfolio.PortfolioWrapper[source]
onData(data, *args)[source]

Connected to dataEvent of the preceding object in the chain. This is the entry point to any processing perfmormed by this object. Result of this processing should be added to the data dict and passed to the subsequent object in the chain using dataEvent (by calling self.dataEvent.emit(data)).

It’s up to the user to emit dataEvent with appropriate data, this event will NOT be emitted by the system, so if it’s not properly implemented, event chain will be broken. This method must be obligatorily overriden in a subclass.

Calling superclass on exit will add a timestamp with object’s name to data, which may be useful for logging.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

An instance of haymaker.portfolio.AbstractBasePortfolio should never be directly included in a processing pipeline, as there should be only one portfolio for all strategies. Instead, include an instance of haymaker.portfolio.PortfolioWrapper in a pipeline. As long as an instance of haymaker.portfolio.AbstractBasePortfolio exists in your package, haymaker.portfolio.PortfolioWrapper ensures it’s connected.

Execution Model

It’s easiest to create an execution model by extending haymaker.execution_models.AbstractExecModel.

class haymaker.execution_models.AbstractExecModel(*, open_order={}, close_order={}, controller=None)[source]

Intermediary between Portfolio and Trader. It translates strategy signals into orders acceptable by Interactive Brokers. May also implement market execution strategy, monitor post-order events or manage any other function related to order execution.

abstractmethod onData(data, *args)[source]

Must use self.trade`(:class:`ibi.Contract(), ibi.Order, strategy_key, reason) to send orders for execution, and subsequently link any ibi.Trade events returned by self.trade() to required callbacks.

While openning position must set self.contract to ibi.Contract that has been used.

Must keep track of current position in the market by updating self.data.position.

Args:

data (dict): is a dict created by Brick, updated by Portfolio, which must contain all parameters required to execute transactions in line with this execution model.

Returns:

:

(trade, note), where:

  • trade: ibi.Trade object for the issued order *

note: info string for loggers and blotters about the character of the transaction (open, close, stop, etc.)

onStart(data, *args)[source]

Perform any initilization required on system (re)start. It will be run automatically and it will be linked to startEvent of the preceding object in the chain.

First Atom in a pipeline (typically a data streamer) will be called by system, which is an indication that (re)start is in progress and we have successfully connected to the broker.

data by default is a dict. Any information that needs to be passed to atoms down the chain, should be appended to data without removing any existing keys.

If overriding the class, call superclass; call to super().onStart(data)() should usually be the last line in overriden method as it will emit startEvent - basically do any processing required in sub-class and then call super-class, which will do standard processing and then emit the event to initialize processing in the next Atom down the chain; if you don’t call superclass, make sure to emit startEvent, otherwise subsequent Atoms in the chain won’t do startup initialization.

This method can be synchronous as well as asynchronous (in the subclass it’s ok to override it with async def onData(self, data, *args)). If it’s async, it will be put in the asyncio loop.

Return type:

None

Implementations

class haymaker.execution_models.BaseExecModel(*, open_order={}, close_order={}, controller=None)[source]

Bases: AbstractExecModel

Orders generated based on data sent to onData(), of which following keys are required:

  • action: must be one of: OPEN, CLOSE, REVERSE

  • signal determines transaction direction, must be one of {-1, 1} for sell/buy respectively; irrelevant for CLOSE, where direction is determined by current position

  • contract - this ibi.Contract instance will be traded

  • amount - quantity of ``contract``s that will be traded

  • target_position - one of {-1, 0, 1} determining direction AFTER transaction is executed; will be used by Controller to verify if the transaction’s effect was as desired

Enters and closes positions based on params sent to onData(). Orders composed by open() and close(), which can be overridden or extended in subclasses to get more complex behaviour.

onData(data, *args)[source]

Must use self.trade`(:class:`ibi.Contract(), ibi.Order, strategy_key, reason) to send orders for execution, and subsequently link any ibi.Trade events returned by self.trade() to required callbacks.

While openning position must set self.contract to ibi.Contract that has been used.

Must keep track of current position in the market by updating self.data.position.

Args:

data (dict): is a dict created by Brick, updated by Portfolio, which must contain all parameters required to execute transactions in line with this execution model.

Returns:

:

(trade, note), where:

  • trade: ibi.Trade object for the issued order *

note: info string for loggers and blotters about the character of the transaction (open, close, stop, etc.)

class haymaker.execution_models.EventDrivenExecModel(*, open_order={}, close_order={}, stop_order={}, tp_order={}, stop=None, take_profit=None, oca_type=2, controller=None)[source]

Bases: BaseExecModel

Use events to attach stop-loss and optional take-profit orders after execution of open order. After close transaction remove existing bracketing orders.

Parameters (keyword only):
open_order, close_order, stop_order, tp_order: dict, optional

maybe used to define order type used for respective transactions; passed dicts will be used to extend and/or update any defaults from config

stop: AbstractBracketLeg instance, must be provided

manages creation of stop-loss order

take_profit: AbstractBracketLeg instance, optional

manages creation of take-profit order

oca_type: int, default 1

OCA group type as per Interactive Brokers definition

controller: Controller instance, optional

passing Controller is meant for testing; otherwise the system should be allowed to use its own mechanisms to create it

close(data, dynamic_order_kwargs=None)[source]

On top of actions perfomed by base class, this method will: attach oca that will cancel any brackets after order execution.

Return type:

Trade | None

open(data, dynamic_order_kwargs=None)[source]

On top of actions perfomed by base class, this method will: save information required for bracket orders and attach events that will attach brackets after order completion.

Return type:

Trade | None

This model automatically places stop-loss orders (and potentially take-profit orders) when the original order is filled.

Example Usage

from dataclasses import dataclass

import ib_insync as ibi
import numpy as np
import pandas as pd

from haymaker import (
    aggregators,
    app,
    base,
    bracket_legs,
    brick,
    execution_models,
    indicators,
    portfolio,
    signals,
    streamers,
)


@dataclass
class EMACrossStrategy(brick.AbstractDfBrick):
    strategy: str
    contract: ibi.Contract
    fast_lookback: int
    slow_lookback: int
    atr_lookback: int

    def df(self, df: pd.DataFrame) -> pd.DataFrame:
        df["fast_ema"] = df["close"].ewm(self.fast_lookback).mean()
        df["slow_ema"] = df["close"].ewm(self.slow_lookback).mean()
        df["signal"] = np.sign(df["fast_ema"] - df["slow_ema"])
        df["atr"] = indicators.atr(df, self.atr_lookback)
        return df


es_contract = ibi.ContFuture("ES", "CME")

portfolio.FixedPortfolio(1)

pipe = base.Pipe(
    streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hours", "TRADES"),
    aggregators.BarAggregator(aggregators.NoFilter()),
    EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24),
    signals.BinarySignalProcessor(),
    portfolio.PortfolioWrapper(),
    execution_models.EventDrivenExecModel(
        stop=bracket_legs.TrailingStop(3, vol_field="atr")
    ),
)

if __name__ == "__main__":
    app.App().run()

Warning

NOT INVESTMENT ADVICE

This example is only meant to illustrate how to use the Haymaker framework. It is unlikely to produce favorable investment outcomes.

Example Walk-Through

This is a simple example implementing a moving average crossover strategy with a stop-loss.

The strategy:

  • Buys 1 e-mini S&P futures contract (‘ES’) whenever the faster exponential moving average (EMA) crosses above the slower one.

  • Sells 1 contract when the faster EMA crosses below the slower EMA.

  • The moment a position-opening order is filled, places a trailing stop-loss order with a distance based on the current instrument’s Average True Range.

  • Whenever the stop-loss is hit, prevents reopening a position in the same direction until an opposite position is opened and closed. This protects against repeated transactions in a volatile, non-trending market.

  • Reverses the position when a signal indicates a direction opposite to the position currently held.

Defining the EMA crossover strategy
from dataclasses import dataclass
import pandas as pd
import numpy as np
from haymaker import brick, indicators
import ib_insync as ibi

@dataclass
class EMACrossStrategy(brick.AbstractDfBrick):
    strategy: str
    contract: ibi.Contract
    fast_lookback: int
    slow_lookback: int
    atr_lookback: int

    def df(self, df: pd.DataFrame) -> pd.DataFrame:
        df["fast_ema"] = df["close"].ewm(self.fast_lookback).mean()
        df["slow_ema"] = df["close"].ewm(self.slow_lookback).mean()
        df["signal"] = np.sign(df["fast_ema"] - df["slow_ema"])
        df["atr"] = indicators.atr(df, self.atr_lookback)
        return df

This defines the trading signals using haymaker.brick.AbstractDfBrick. It requires a dataclasses.dataclass with the strategy name, contract, and parameters. The haymaker.brick.AbstractDfBrick.df() method must be overridden to process a pandas.DataFrame containing market data (e.g., Open, High, Low, Close, Volume, AveragePrice), depending on the connected streamers and processors.

The data received via onData is wrapped into a pandas.DataFrame with column names matching the keys in the data dictionary. Users must ensure upstream components provide all required data for signal generation.

The haymaker.brick.AbstractDfBrick.df() method must return a pandas.DataFrame with a signal column: 1 for long, 0 for no position, -1 for short. Additional columns (e.g., atr) can be included for downstream components.

Defining the ES futures contract
es_contract = ibi.ContFuture("ES", "CME")

The ib_insync.contracts.ContFuture contract is not directly tradable. Haymaker replaces it with the current on-the-run futures contract and rolls it to the next contract near expiration. Refer to other documentation sections for customization details.

Setting a fixed portfolio size
from haymaker import portfolio
portfolio.FixedPortfolio(1)

Typically, a haymaker.portfolio.FixedPortfolio would include more logic. Here, it trades one contract regardless of circumstances—a simplistic approach not recommended for real use.

Assembling the pipeline
from haymaker import base, streamers, aggregators, signals, execution_models, portfolio, bracket_legs

pipe = base.Pipe(
    streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hour", "TRADES"),
    aggregators.BarAggregator(aggregators.NoFilter()),
    EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24),
    signals.BinarySignalProcessor(),
    portfolio.PortfolioWrapper(),
    execution_models.EventDrivenExecModel(
        stop=bracket_legs.TrailingStop(3, vol_field="atr")
    ),
)

The haymaker.base.Pipe connects components into an event-driven pipeline. Market data from the streamer triggers processing, potentially resulting in broker orders.

Components used:

  • Historical data streamer
    streamers.HistoricalDataStreamer(es_contract, "10 D", "1 hour", "TRADES")
    

    Pulls 10 days of 1-hour “TRADES” data for es_contract from the broker, updating with new data points. The framework automatically fills gaps if disruptions occur.

  • No-op aggregator
    aggregators.BarAggregator(aggregators.NoFilter())
    

    No aggregation or processing is applied (using haymaker.aggregators.NoFilter). An aggregator is currently required with historical data streamers to track history.

  • EMA crossover strategy instance
    EMACrossStrategy("ema_cross_ES", es_contract, 12, 48, 24)
    

    Instantiates the strategy with arbitrary parameters: 12-hour and 48-hour EMAs, 24-hour ATR.

    Note

    These parameters are illustrative and not optimized.

  • Binary signal processor
    signals.BinarySignalProcessor()
    

    The haymaker.signals.BinarySignalProcessor ensures: * Repeated signals in the same direction are ignored if a position exists. * Post-stop-loss, prevents reopening in the same direction until an opposite position is cycled. * Reverses positions on opposing signals.

  • Portfolio wrapper
    portfolio.PortfolioWrapper()
    

    Connects to a single global haymaker.portfolio.FixedPortfolio instance, ensuring strategy-specific data flows correctly to downstream components.

  • Event-driven execution model
    execution_models.EventDrivenExecModel(
        stop=bracket_legs.TrailingStop(3, vol_field="atr")
    )
    

    Sends a ib_insync.order.MarketOrder to the broker. Upon fill, places a trailing stop-loss order with a distance of 3×ATR (requires an atr column from the haymaker.brick.AbstractDfBrick).

Running the application
from haymaker import app

if __name__ == "__main__":
    app.App().run()

Run this as a script (e.g., strategy.py):

Starting the strategy
python strategy.py

Ensure python is the correct interpreter. Long-running scripts should be managed as processes (see process management documentation—link TBD).

Conclusion

In real-world strategies, you’d trade multiple instruments and parameter sets. Using the patterns above with Python data structures, you can create pipelines for as many combinations as needed.