можно просто попытаться как вы пишете отфильтровать, для этого в начале получить определенную структуру и тип данных:
источникДанных
.мап(созданиеСтруктуры)
.фильтр(текущаяЗапись => СписокТребуемыхНомеров.содержит(текущаяЗапись.телефон)
&& текущаяЗапись.дата<>требуемыйИнтервал)
так будет работать, но очень долго, медленно и сожрет кучу ресурсов на одной машине - это не то ради чего спарк используют, спарк - это движек для распределенных вычислений. А чтоб запустить распределенные вычисления, нужно в начале создать пару "ключ"->"значение" (где ключ номер телефона, а значение все остальные данные), эти пары распределятся по узлам, где будут параллельно обрабатываться, а потом результат паралельной обработки агрегировать в один общий результат, и для этого не фильтр использовать а reduceByKey с aggregate, для паралельного сбора ключей и значений для этих ключей.